You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/05/19 11:23:46 UTC
incubator-tinkerpop git commit: added traversal-based vote strength
test to PeerPressureTest. Cleaned up internal classes of
TraversalVertexProgram a bit for better organization. Generalized Phase enum
so it can be used by other computer-based classes. a
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1305 b100f033e -> 5bcf0b01c
added traversal-based vote strength test to PeerPressureTest. Cleaned up internal classes of TraversalVertexProgram a bit for better organization. Generalized Phase enum so it can be used by other computer-based classes. added comments to TraveralVertexProgram.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/5bcf0b01
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/5bcf0b01
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/5bcf0b01
Branch: refs/heads/TINKERPOP-1305
Commit: 5bcf0b01c7bfc77e9eea8fdb7b2c4c54a4fc0000
Parents: b100f03
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu May 19 05:23:36 2016 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu May 19 05:23:36 2016 -0600
----------------------------------------------------------------------
docs/src/reference/the-traversal.asciidoc | 17 +-
.../gremlin/process/computer/ProgramPhase.java | 40 ++++
.../peerpressure/PeerPressureVertexProgram.java | 21 +-
.../traversal/MemoryTraversalSideEffects.java | 27 +--
.../computer/traversal/SingleMessenger.java | 49 -----
.../traversal/TraversalVertexProgram.java | 18 +-
.../computer/traversal/TraverserExecutor.java | 216 ------------------
.../computer/traversal/WorkerExecutor.java | 220 +++++++++++++++++++
.../step/map/PeerPressureVertexProgramStep.java | 9 +-
.../process/computer/util/EmptyMemory.java | 58 ++++-
.../process/computer/util/SingleMessenger.java | 49 +++++
.../step/map/GroovyPeerPressureTest.groovy | 5 +
.../traversal/step/map/PeerPressureTest.java | 31 +++
.../process/traversal/step/map/ProgramTest.java | 11 +-
.../SparkStarBarrierInterceptor.java | 3 +-
15 files changed, 460 insertions(+), 314 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/docs/src/reference/the-traversal.asciidoc
----------------------------------------------------------------------
diff --git a/docs/src/reference/the-traversal.asciidoc b/docs/src/reference/the-traversal.asciidoc
index 3394e90..4c74795 100644
--- a/docs/src/reference/the-traversal.asciidoc
+++ b/docs/src/reference/the-traversal.asciidoc
@@ -1434,6 +1434,10 @@ vertex program includes:
The user supplied `VertexProgram` can leverage that information accordingly within their vertex program. Example uses
are provided below.
+WARNING: Developing a `VertexProgram` is for expert users. Moreover, developing one that can be used effectively within
+a traversal requires yet more expertise. This information is recommended to advanced users with a deep understanding of the
+mechanics of Gremlin OLAP (<<graphcomputer,`GraphComputer`>>).
+
[source,java]
----
private TraverserSet<Object> haltedTraversers;
@@ -1447,7 +1451,7 @@ public void loadState(final Graph graph, final Configuration configuration) {
// if master-traversal traversers may be propagated, create a memory compute key
this.memoryComputeKeys.add(MemoryComputeKey.of(TraversalVertexProgram.HALTED_TRAVERSERS, Operator.addAll, false, false));
// returns an empty traverser set if there are no halted traversers
- this.haltedTraversers = TraversalVertexProgram.getHaltedTraversers(configuration);
+ this.haltedTraversers = TraversalVertexProgram.loadHaltedTraversers(configuration);
}
public void storeState(final Configuration configuration) {
@@ -1457,22 +1461,21 @@ public void storeState(final Configuration configuration) {
}
public void setup(final Memory memory) {
- if(null != this.haltedTraversers) {
+ if(!this.haltedTraversers.isEmpty()) {
// do what you like with the halted master traversal traversers
}
// once used, no need to keep that information around (master)
- if(null != this.haltedTraversers)
- this.haltedTraversers.clear()
+ this.haltedTraversers = null;
}
public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
// once used, no need to keep that information around (workers)
if(null != this.haltedTraversers)
- this.haltedTraversers.clear()
+ this.haltedTraversers = null;
if(vertex.property(TraversalVertexProgram.HALTED_TRAVERSERS).isPresent()) {
// haltedTraversers in execute() represent worker-traversal traversers
// for example, from a traversal of the form g.V().out().program(...)
- TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS) :
+ TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
// create a new halted traverser set that can be used by the next OLAP job in the chain
// these are worker-traversers that are distributed throughout the graph
TraverserSet<Object> newHaltedTraversers = new TraverserSet<>();
@@ -1480,7 +1483,7 @@ public void execute(final Vertex vertex, final Messenger messenger, final Memory
newHaltedTraversers.add(traverser.split(traverser.get().toString(), this.programStep));
});
vertex.property(VertexProperty.Cardinality.single, TraversalVertexProgram.HALTED_TRAVERSERS, newHaltedTraversers);
- // it is possible to create master-traversers that are localized to the master traversal (thread)
+ // it is possible to create master-traversers that are localized to the master traversal (this is how results are ultimately delivered back to the user)
memory.add(TraversalVertexProgram.HALTED_TRAVERSERS,
new TraverserSet<>(this.traversal().get().getTraverserGenerator().generate("an example", this.programStep, 1l)));
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ProgramPhase.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ProgramPhase.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ProgramPhase.java
new file mode 100644
index 0000000..ce39505
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/ProgramPhase.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.computer;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public enum ProgramPhase {
+
+ SETUP,
+ WORKER_ITERATION_START,
+ EXECUTE,
+ WORKER_ITERATION_END,
+ TERMINATE;
+
+ public boolean masterState() {
+ return this == SETUP || this == TERMINATE;
+ }
+
+ public boolean workerState() {
+ return !this.masterState();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
index 8834882..56de255 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/clustering/peerpressure/PeerPressureVertexProgram.java
@@ -34,6 +34,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.MapHelper;
import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.util.ScriptTraversal;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
@@ -62,6 +63,7 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
public static final String CLUSTER = "gremlin.peerPressureVertexProgram.cluster";
private static final String VOTE_STRENGTH = "gremlin.peerPressureVertexProgram.voteStrength";
+ private static final String INITIAL_VOTE_STRENGTH_TRAVERSAL = "gremlin.pageRankVertexProgram.initialVoteStrengthTraversal";
private static final String PROPERTY = "gremlin.peerPressureVertexProgram.property";
private static final String MAX_ITERATIONS = "gremlin.peerPressureVertexProgram.maxIterations";
private static final String DISTRIBUTE_VOTE = "gremlin.peerPressureVertexProgram.distributeVote";
@@ -69,6 +71,7 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
private static final String VOTE_TO_HALT = "gremlin.peerPressureVertexProgram.voteToHalt";
private PureTraversal<Vertex, Edge> edgeTraversal = null;
+ private PureTraversal<Vertex, ? extends Number> initialVoteStrengthTraversal = null;
private int maxIterations = 30;
private boolean distributeVote = false;
private String property = CLUSTER;
@@ -81,6 +84,8 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
@Override
public void loadState(final Graph graph, final Configuration configuration) {
+ if (configuration.containsKey(INITIAL_VOTE_STRENGTH_TRAVERSAL))
+ this.initialVoteStrengthTraversal = PureTraversal.loadState(configuration, INITIAL_VOTE_STRENGTH_TRAVERSAL, graph);
if (configuration.containsKey(EDGE_TRAVERSAL)) {
this.edgeTraversal = PureTraversal.loadState(configuration, EDGE_TRAVERSAL, graph);
this.voteScope = MessageScope.Local.of(() -> this.edgeTraversal.get().clone());
@@ -99,6 +104,8 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
configuration.setProperty(DISTRIBUTE_VOTE, this.distributeVote);
if (null != this.edgeTraversal)
this.edgeTraversal.storeState(configuration, EDGE_TRAVERSAL);
+ if (null != this.initialVoteStrengthTraversal)
+ this.initialVoteStrengthTraversal.storeState(configuration, INITIAL_VOTE_STRENGTH_TRAVERSAL);
}
@Override
@@ -137,14 +144,19 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
if (this.distributeVote) {
messenger.sendMessage(this.countScope, Pair.with('c', 1.0d));
} else {
- double voteStrength = 1.0d;
+ double voteStrength = (null == this.initialVoteStrengthTraversal ?
+ 1.0d :
+ TraversalUtil.apply(vertex, this.initialVoteStrengthTraversal.get()).doubleValue());
vertex.property(VertexProperty.Cardinality.single, this.property, vertex.id());
vertex.property(VertexProperty.Cardinality.single, VOTE_STRENGTH, voteStrength);
messenger.sendMessage(this.voteScope, new Pair<>((Serializable) vertex.id(), voteStrength));
memory.add(VOTE_TO_HALT, false);
}
} else if (1 == memory.getIteration() && this.distributeVote) {
- double voteStrength = 1.0d / IteratorUtils.reduce(IteratorUtils.map(messenger.receiveMessages(), Pair::getValue1), 0.0d, (a, b) -> a + b);
+ double voteStrength = (null == this.initialVoteStrengthTraversal ?
+ 1.0d :
+ TraversalUtil.apply(vertex, this.initialVoteStrengthTraversal.get()).doubleValue()) /
+ IteratorUtils.reduce(IteratorUtils.map(messenger.receiveMessages(), Pair::getValue1), 0.0d, (a, b) -> a + b);
vertex.property(VertexProperty.Cardinality.single, this.property, vertex.id());
vertex.property(VertexProperty.Cardinality.single, VOTE_STRENGTH, voteStrength);
messenger.sendMessage(this.voteScope, new Pair<>((Serializable) vertex.id(), voteStrength));
@@ -227,6 +239,11 @@ public class PeerPressureVertexProgram extends StaticVertexProgram<Pair<Serializ
return this;
}
+ public Builder initialVoteStrength(final Traversal.Admin<Vertex, ? extends Number> initialVoteStrengthTraversal) {
+ PureTraversal.storeState(this.configuration, INITIAL_VOTE_STRENGTH_TRAVERSAL, initialVoteStrengthTraversal);
+ return this;
+ }
+
/**
* @deprecated As of release 3.2.0, replaced by {@link PeerPressureVertexProgram.Builder#edges(Traversal.Admin)}
*/
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
index 23d33f1..bf9f8c0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/MemoryTraversalSideEffects.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
+import org.apache.tinkerpop.gremlin.process.computer.ProgramPhase;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
@@ -38,23 +39,7 @@ public final class MemoryTraversalSideEffects implements TraversalSideEffects {
private TraversalSideEffects sideEffects;
private Memory memory;
- private State state;
-
- public enum State {
- SETUP,
- WORKER_ITERATION_START,
- EXECUTE,
- WORKER_ITERATION_END,
- TERMINATE;
-
- public boolean masterState() {
- return this == SETUP || this == TERMINATE;
- }
-
- public boolean workerState() {
- return !this.masterState();
- }
- }
+ private ProgramPhase phase;
private MemoryTraversalSideEffects() {
// for serialization
@@ -93,7 +78,7 @@ public final class MemoryTraversalSideEffects implements TraversalSideEffects {
@Override
public void add(final String key, final Object value) {
- if (this.state.workerState())
+ if (this.phase.workerState())
this.memory.add(key, value);
else
this.memory.set(key, this.sideEffects.getReducer(key).apply(this.memory.get(key), value));
@@ -168,20 +153,20 @@ public final class MemoryTraversalSideEffects implements TraversalSideEffects {
}
public void storeSideEffectsInMemory() {
- if (this.state.workerState())
+ if (this.phase.workerState())
this.sideEffects.forEach(this.memory::add);
else
this.sideEffects.forEach(this.memory::set);
}
- public static void setMemorySideEffects(final Traversal.Admin<?, ?> traversal, final Memory memory, final State state) {
+ public static void setMemorySideEffects(final Traversal.Admin<?, ?> traversal, final Memory memory, final ProgramPhase phase) {
final TraversalSideEffects sideEffects = traversal.getSideEffects();
if (!(sideEffects instanceof MemoryTraversalSideEffects)) {
traversal.setSideEffects(new MemoryTraversalSideEffects(sideEffects));
}
final MemoryTraversalSideEffects memoryTraversalSideEffects = ((MemoryTraversalSideEffects) traversal.getSideEffects());
memoryTraversalSideEffects.memory = memory;
- memoryTraversalSideEffects.state = state;
+ memoryTraversalSideEffects.phase = phase;
}
public static Set<MemoryComputeKey> getMemoryComputeKeys(final Traversal.Admin<?, ?> traversal) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/SingleMessenger.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/SingleMessenger.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/SingleMessenger.java
deleted file mode 100644
index 26ed8a4..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/SingleMessenger.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.process.computer.traversal;
-
-import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
-import org.apache.tinkerpop.gremlin.process.computer.Messenger;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-
-import java.util.Iterator;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SingleMessenger<M> implements Messenger<M> {
-
- private final Messenger<M> baseMessenger;
- private final M message;
-
- public SingleMessenger(final Messenger<M> baseMessenger, final M message) {
- this.baseMessenger = baseMessenger;
- this.message = message;
- }
-
- @Override
- public Iterator<M> receiveMessages() {
- return IteratorUtils.of(this.message);
- }
-
- @Override
- public void sendMessage(final MessageScope messageScope, final M message) {
- this.baseMessenger.sendMessage(messageScope, message);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index c7e7ef9..266426f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -27,12 +27,14 @@ import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.computer.ProgramPhase;
import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.ComputerResultStep;
import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
import org.apache.tinkerpop.gremlin.process.computer.util.AbstractVertexProgramBuilder;
+import org.apache.tinkerpop.gremlin.process.computer.util.SingleMessenger;
import org.apache.tinkerpop.gremlin.process.computer.util.VertexProgramHelper;
import org.apache.tinkerpop.gremlin.process.traversal.Operator;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
@@ -195,13 +197,13 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
@Override
public void setup(final Memory memory) {
// memory is local
- MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.SETUP);
+ MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, ProgramPhase.SETUP);
((MemoryTraversalSideEffects) this.traversal.get().getSideEffects()).storeSideEffectsInMemory();
memory.set(VOTE_TO_HALT, true);
memory.set(MUTATED_MEMORY_KEYS, new HashSet<>());
memory.set(COMPLETED_BARRIERS, new HashSet<>());
- // if halted traversers are being sent from a previous VertexProgram in an OLAP chain (non-distributed traversers), get them into the stream
- if (null != this.haltedTraversers && !this.haltedTraversers.isEmpty()) {
+ // if halted traversers are being sent from a previous VertexProgram in an OLAP chain (non-distributed traversers), get them into the flow
+ if (!this.haltedTraversers.isEmpty()) {
final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
IteratorUtils.removeOnNext(this.haltedTraversers.iterator()).forEachRemaining(traverser -> {
traverser.setStepId(this.traversal.get().getStartStep().getId());
@@ -231,7 +233,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
if (null != this.haltedTraversers)
this.haltedTraversers = null;
// memory is distributed
- MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.EXECUTE);
+ MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, ProgramPhase.EXECUTE);
// if a barrier was completed in another worker, it is also completed here (ensure distributed barriers are synchronized)
final Set<String> completedBarriers = memory.get(COMPLETED_BARRIERS);
for (final String stepId : completedBarriers) {
@@ -245,11 +247,13 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
//////////////////
if (memory.isInitialIteration()) { // ITERATION 1
final TraverserSet<Object> activeTraversers = new TraverserSet<>();
+ // if halted traversers are being sent from a previous VertexProgram in an OLAP chain (distributed traversers), get them into the flow
IteratorUtils.removeOnNext(haltedTraversers.iterator()).forEachRemaining(traverser -> {
traverser.setStepId(this.traversal.get().getStartStep().getId());
activeTraversers.add(traverser);
});
assert haltedTraversers.isEmpty();
+ // for g.V()/E()
if (this.traversal.get().getStartStep() instanceof GraphStep) {
final GraphStep<Element, Element> graphStep = (GraphStep<Element, Element>) this.traversal.get().getStartStep();
graphStep.reset();
@@ -270,9 +274,9 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
activeTraversers.add((Traverser.Admin) traverser);
});
}
- memory.add(VOTE_TO_HALT, activeTraversers.isEmpty() || TraverserExecutor.execute(vertex, new SingleMessenger<>(messenger, activeTraversers), this.traversalMatrix, memory, this.returnHaltedTraversers));
+ memory.add(VOTE_TO_HALT, activeTraversers.isEmpty() || WorkerExecutor.execute(vertex, new SingleMessenger<>(messenger, activeTraversers), this.traversalMatrix, memory, this.returnHaltedTraversers));
} else { // ITERATION 1+
- memory.add(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix, memory, this.returnHaltedTraversers));
+ memory.add(VOTE_TO_HALT, WorkerExecutor.execute(vertex, messenger, this.traversalMatrix, memory, this.returnHaltedTraversers));
}
if (this.returnHaltedTraversers || haltedTraversers.isEmpty())
vertex.<TraverserSet>property(HALTED_TRAVERSERS).remove();
@@ -281,7 +285,7 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
@Override
public boolean terminate(final Memory memory) {
// memory is local
- MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.TERMINATE);
+ MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, ProgramPhase.TERMINATE);
final boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT);
memory.set(VOTE_TO_HALT, true);
memory.set(ACTIVE_TRAVERSERS, new TraverserSet<>());
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
deleted file mode 100644
index c5f9bb5..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
+++ /dev/null
@@ -1,216 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.process.computer.traversal;
-
-import org.apache.tinkerpop.gremlin.process.computer.Memory;
-import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
-import org.apache.tinkerpop.gremlin.process.computer.Messenger;
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
-import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Element;
-import org.apache.tinkerpop.gremlin.structure.Property;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.structure.util.Attachable;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class TraverserExecutor {
-
- public static boolean execute(final Vertex vertex, final Messenger<TraverserSet<Object>> messenger, final TraversalMatrix<?, ?> traversalMatrix, final Memory memory, final boolean returnHaltedTraversers) {
-
- final TraversalSideEffects traversalSideEffects = traversalMatrix.getTraversal().getSideEffects();
- final AtomicBoolean voteToHalt = new AtomicBoolean(true);
- final TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
- final TraverserSet<Object> activeTraversers = new TraverserSet<>();
- final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
-
- ////////////////////////////////
- // GENERATE LOCAL TRAVERSERS //
- ///////////////////////////////
-
- // these are traversers that are going from OLTP to OLAP
- final TraverserSet<Object> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
- final Iterator<Traverser.Admin<Object>> iterator = maybeActiveTraversers.iterator();
- while (iterator.hasNext()) {
- final Traverser.Admin<Object> traverser = iterator.next();
- if (vertex.equals(TraverserExecutor.getHostingVertex(traverser.get()))) {
- // iterator.remove(); ConcurrentModificationException
- traverser.attach(Attachable.Method.get(vertex));
- traverser.setSideEffects(traversalSideEffects);
- toProcessTraversers.add(traverser);
- }
- }
- // these are traversers that exist from from a local barrier
- vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).ifPresent(previousActiveTraversers -> {
- IteratorUtils.removeOnNext(previousActiveTraversers.iterator()).forEachRemaining(traverser -> {
- traverser.attach(Attachable.Method.get(vertex));
- traverser.setSideEffects(traversalSideEffects);
- toProcessTraversers.add(traverser);
- });
- vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS).remove();
- });
- // these are traversers that have been messaged to the vertex
- final Iterator<TraverserSet<Object>> messages = messenger.receiveMessages();
- while (messages.hasNext()) {
- final Iterator<Traverser.Admin<Object>> traversers = messages.next().iterator();
- while (traversers.hasNext()) {
- final Traverser.Admin<Object> traverser = traversers.next();
- traversers.remove();
- if (traverser.isHalted()) {
- if (returnHaltedTraversers)
- memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser));
- else
- haltedTraversers.add(traverser);
- } else {
- traverser.attach(Attachable.Method.get(vertex));
- traverser.setSideEffects(traversalSideEffects);
- toProcessTraversers.add(traverser);
- }
- }
- }
-
- ///////////////////////////////
- // PROCESS LOCAL TRAVERSERS //
- //////////////////////////////
-
- // while there are still local traversers, process them until they leave the vertex or halt (i.e. isHalted()).
- while (!toProcessTraversers.isEmpty()) {
- // process local traversers and if alive, repeat, else halt.
- Step<Object, Object> previousStep = EmptyStep.instance();
- Iterator<Traverser.Admin<Object>> traversers = toProcessTraversers.iterator();
- while (traversers.hasNext()) {
- final Traverser.Admin<Object> traverser = traversers.next();
- traversers.remove();
- final Step<Object, Object> currentStep = traversalMatrix.getStepById(traverser.getStepId());
- if (!currentStep.getId().equals(previousStep.getId()) && !(previousStep instanceof EmptyStep))
- TraverserExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers);
- currentStep.addStart(traverser);
- previousStep = currentStep;
- }
- TraverserExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers);
- assert toProcessTraversers.isEmpty();
- // process all the local objects and send messages or store locally again
- if (!activeTraversers.isEmpty()) {
- traversers = activeTraversers.iterator();
- while (traversers.hasNext()) {
- final Traverser.Admin<Object> traverser = traversers.next();
- traversers.remove();
- if (traverser.get() instanceof Element || traverser.get() instanceof Property) { // GRAPH OBJECT
- // if the element is remote, then message, else store it locally for re-processing
- final Vertex hostingVertex = TraverserExecutor.getHostingVertex(traverser.get());
- if (!vertex.equals(hostingVertex)) { // necessary for path access
- voteToHalt.set(false);
- traverser.detach();
- messenger.sendMessage(MessageScope.Global.of(hostingVertex), new TraverserSet<>(traverser));
- } else {
- if (traverser.get() instanceof Attachable) // necessary for path access to local object
- traverser.attach(Attachable.Method.get(vertex));
- toProcessTraversers.add(traverser);
- }
- } else // STANDARD OBJECT
- toProcessTraversers.add(traverser);
- }
- assert activeTraversers.isEmpty();
- }
- }
- return voteToHalt.get();
- }
-
- private static void drainStep(final Vertex vertex, final Step<Object, Object> step, final TraverserSet<Object> activeTraversers, final TraverserSet<Object> haltedTraversers, final Memory memory, final boolean returnHaltedTraversers) {
- if (step instanceof Barrier) {
- if (step instanceof Bypassing)
- ((Bypassing) step).setBypass(true);
- if (step instanceof LocalBarrier) {
- final LocalBarrier<Object> barrier = (LocalBarrier<Object>) step;
- final TraverserSet<Object> traverserSet = vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).orElse(new TraverserSet<>());
- vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS, traverserSet);
- while (barrier.hasNextBarrier()) {
- final TraverserSet<Object> barrierSet = barrier.nextBarrier();
- IteratorUtils.removeOnNext(barrierSet.iterator()).forEachRemaining(traverser -> {
- traverser.addLabels(step.getLabels()); // this might need to be generalized for working with global barriers too
- if (traverser.isHalted() &&
- (returnHaltedTraversers ||
- (!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
- getHostingVertex(traverser.get()).equals(vertex))) {
- traverser.detach();
- if (returnHaltedTraversers)
- memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser));
- else
- haltedTraversers.add(traverser);
- } else {
- traverser.detach();
- traverserSet.add(traverser);
- }
- });
- }
- memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId())));
- } else {
- final Barrier barrier = (Barrier) step;
- while (barrier.hasNextBarrier()) {
- memory.add(step.getId(), barrier.nextBarrier());
- }
- memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId())));
- }
- } else { // LOCAL PROCESSING
- step.forEachRemaining(traverser -> {
- if (traverser.isHalted() &&
- (returnHaltedTraversers ||
- (!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
- getHostingVertex(traverser.get()).equals(vertex))) {
- traverser.detach();
- if (returnHaltedTraversers)
- memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser));
- else
- haltedTraversers.add(traverser);
- } else {
- activeTraversers.add(traverser);
- }
- });
- }
- }
-
- private static Vertex getHostingVertex(final Object object) {
- Object obj = object;
- while (true) {
- if (obj instanceof Vertex)
- return (Vertex) obj;
- else if (obj instanceof Edge)
- return ((Edge) obj).outVertex();
- else if (obj instanceof Property)
- obj = ((Property) obj).element();
- else
- throw new IllegalStateException("The host of the object is unknown: " + obj.toString() + ':' + obj.getClass().getCanonicalName());
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
new file mode 100644
index 0000000..c4bd659
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/WorkerExecutor.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.computer.traversal;
+
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
+import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Property;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+final class WorkerExecutor {
+
+ private WorkerExecutor() {
+
+ }
+
+ protected static boolean execute(final Vertex vertex, final Messenger<TraverserSet<Object>> messenger, final TraversalMatrix<?, ?> traversalMatrix, final Memory memory, final boolean returnHaltedTraversers) {
+
+ final TraversalSideEffects traversalSideEffects = traversalMatrix.getTraversal().getSideEffects();
+ final AtomicBoolean voteToHalt = new AtomicBoolean(true);
+ final TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
+ final TraverserSet<Object> activeTraversers = new TraverserSet<>();
+ final TraverserSet<Object> toProcessTraversers = new TraverserSet<>();
+
+ ////////////////////////////////
+ // GENERATE LOCAL TRAVERSERS //
+ ///////////////////////////////
+
+ // these are traversers that are going from OLTP to OLAP
+ final TraverserSet<Object> maybeActiveTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
+ final Iterator<Traverser.Admin<Object>> iterator = maybeActiveTraversers.iterator();
+ while (iterator.hasNext()) {
+ final Traverser.Admin<Object> traverser = iterator.next();
+ if (vertex.equals(WorkerExecutor.getHostingVertex(traverser.get()))) {
+ // iterator.remove(); ConcurrentModificationException
+ traverser.attach(Attachable.Method.get(vertex));
+ traverser.setSideEffects(traversalSideEffects);
+ toProcessTraversers.add(traverser);
+ }
+ }
+ // these are traversers that exist from from a local barrier
+ vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).ifPresent(previousActiveTraversers -> {
+ IteratorUtils.removeOnNext(previousActiveTraversers.iterator()).forEachRemaining(traverser -> {
+ traverser.attach(Attachable.Method.get(vertex));
+ traverser.setSideEffects(traversalSideEffects);
+ toProcessTraversers.add(traverser);
+ });
+ vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS).remove();
+ });
+ // these are traversers that have been messaged to the vertex
+ final Iterator<TraverserSet<Object>> messages = messenger.receiveMessages();
+ while (messages.hasNext()) {
+ final Iterator<Traverser.Admin<Object>> traversers = messages.next().iterator();
+ while (traversers.hasNext()) {
+ final Traverser.Admin<Object> traverser = traversers.next();
+ traversers.remove();
+ if (traverser.isHalted()) {
+ if (returnHaltedTraversers)
+ memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser));
+ else
+ haltedTraversers.add(traverser);
+ } else {
+ traverser.attach(Attachable.Method.get(vertex));
+ traverser.setSideEffects(traversalSideEffects);
+ toProcessTraversers.add(traverser);
+ }
+ }
+ }
+
+ ///////////////////////////////
+ // PROCESS LOCAL TRAVERSERS //
+ //////////////////////////////
+
+ // while there are still local traversers, process them until they leave the vertex or halt (i.e. isHalted()).
+ while (!toProcessTraversers.isEmpty()) {
+ // process local traversers and if alive, repeat, else halt.
+ Step<Object, Object> previousStep = EmptyStep.instance();
+ Iterator<Traverser.Admin<Object>> traversers = toProcessTraversers.iterator();
+ while (traversers.hasNext()) {
+ final Traverser.Admin<Object> traverser = traversers.next();
+ traversers.remove();
+ final Step<Object, Object> currentStep = traversalMatrix.getStepById(traverser.getStepId());
+ if (!currentStep.getId().equals(previousStep.getId()) && !(previousStep instanceof EmptyStep))
+ WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers);
+ currentStep.addStart(traverser);
+ previousStep = currentStep;
+ }
+ WorkerExecutor.drainStep(vertex, previousStep, activeTraversers, haltedTraversers, memory, returnHaltedTraversers);
+ assert toProcessTraversers.isEmpty();
+ // process all the local objects and send messages or store locally again
+ if (!activeTraversers.isEmpty()) {
+ traversers = activeTraversers.iterator();
+ while (traversers.hasNext()) {
+ final Traverser.Admin<Object> traverser = traversers.next();
+ traversers.remove();
+ if (traverser.get() instanceof Element || traverser.get() instanceof Property) { // GRAPH OBJECT
+ // if the element is remote, then message, else store it locally for re-processing
+ final Vertex hostingVertex = WorkerExecutor.getHostingVertex(traverser.get());
+ if (!vertex.equals(hostingVertex)) { // necessary for path access
+ voteToHalt.set(false);
+ traverser.detach();
+ messenger.sendMessage(MessageScope.Global.of(hostingVertex), new TraverserSet<>(traverser));
+ } else {
+ if (traverser.get() instanceof Attachable) // necessary for path access to local object
+ traverser.attach(Attachable.Method.get(vertex));
+ toProcessTraversers.add(traverser);
+ }
+ } else // STANDARD OBJECT
+ toProcessTraversers.add(traverser);
+ }
+ assert activeTraversers.isEmpty();
+ }
+ }
+ return voteToHalt.get();
+ }
+
+ private static void drainStep(final Vertex vertex, final Step<Object, Object> step, final TraverserSet<Object> activeTraversers, final TraverserSet<Object> haltedTraversers, final Memory memory, final boolean returnHaltedTraversers) {
+ if (step instanceof Barrier) {
+ if (step instanceof Bypassing)
+ ((Bypassing) step).setBypass(true);
+ if (step instanceof LocalBarrier) {
+ final LocalBarrier<Object> barrier = (LocalBarrier<Object>) step;
+ final TraverserSet<Object> traverserSet = vertex.<TraverserSet<Object>>property(TraversalVertexProgram.ACTIVE_TRAVERSERS).orElse(new TraverserSet<>());
+ vertex.property(TraversalVertexProgram.ACTIVE_TRAVERSERS, traverserSet);
+ while (barrier.hasNextBarrier()) {
+ final TraverserSet<Object> barrierSet = barrier.nextBarrier();
+ IteratorUtils.removeOnNext(barrierSet.iterator()).forEachRemaining(traverser -> {
+ traverser.addLabels(step.getLabels()); // this might need to be generalized for working with global barriers too
+ if (traverser.isHalted() &&
+ (returnHaltedTraversers ||
+ (!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
+ getHostingVertex(traverser.get()).equals(vertex))) {
+ traverser.detach();
+ if (returnHaltedTraversers)
+ memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser));
+ else
+ haltedTraversers.add(traverser);
+ } else {
+ traverser.detach();
+ traverserSet.add(traverser);
+ }
+ });
+ }
+ memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId())));
+ } else {
+ final Barrier barrier = (Barrier) step;
+ while (barrier.hasNextBarrier()) {
+ memory.add(step.getId(), barrier.nextBarrier());
+ }
+ memory.add(TraversalVertexProgram.MUTATED_MEMORY_KEYS, new HashSet<>(Collections.singleton(step.getId())));
+ }
+ } else { // LOCAL PROCESSING
+ step.forEachRemaining(traverser -> {
+ if (traverser.isHalted() &&
+ (returnHaltedTraversers ||
+ (!(traverser.get() instanceof Element) && !(traverser.get() instanceof Property)) ||
+ getHostingVertex(traverser.get()).equals(vertex))) {
+ traverser.detach();
+ if (returnHaltedTraversers)
+ memory.add(TraversalVertexProgram.HALTED_TRAVERSERS, new TraverserSet<>(traverser));
+ else
+ haltedTraversers.add(traverser);
+ } else {
+ activeTraversers.add(traverser);
+ }
+ });
+ }
+ }
+
+ private static Vertex getHostingVertex(final Object object) {
+ Object obj = object;
+ while (true) {
+ if (obj instanceof Vertex)
+ return (Vertex) obj;
+ else if (obj instanceof Edge)
+ return ((Edge) obj).outVertex();
+ else if (obj instanceof Property)
+ obj = ((Property) obj).element();
+ else
+ throw new IllegalStateException("The host of the object is unknown: " + obj.toString() + ':' + obj.getClass().getCanonicalName());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java
index 7bd726e..0ea5112 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/PeerPressureVertexProgramStep.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.computer.traversal.step.map;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.clustering.peerpressure.PeerPressureVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.lambda.HaltedTraversersCountTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
@@ -87,11 +88,13 @@ public final class PeerPressureVertexProgramStep extends VertexProgramStep imple
public PeerPressureVertexProgram generateProgram(final Graph graph, final Memory memory) {
final Traversal.Admin<Vertex, Edge> detachedTraversal = this.edgeTraversal.getPure();
detachedTraversal.setStrategies(TraversalStrategies.GlobalCache.getStrategies(graph.getClass()));
- return PeerPressureVertexProgram.build()
+ final PeerPressureVertexProgram.Builder builder = PeerPressureVertexProgram.build()
.property(this.clusterProperty)
.maxIterations(this.times)
- .edges(detachedTraversal)
- .create(graph);
+ .edges(detachedTraversal);
+ if (this.previousTraversalVertexProgram())
+ builder.initialVoteStrength(new HaltedTraversersCountTraversal());
+ return builder.create(graph);
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/EmptyMemory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/EmptyMemory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/EmptyMemory.java
index 72b1bbf..f513ee6 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/EmptyMemory.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/EmptyMemory.java
@@ -21,19 +21,71 @@ package org.apache.tinkerpop.gremlin.process.computer.util;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import java.util.Collections;
+import java.util.Set;
+
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class EmptyMemory {
+public final class EmptyMemory implements Memory.Admin {
- private static final Memory INSTANCE = new ImmutableMemory(new MapMemory());
+ private static final EmptyMemory INSTANCE = new EmptyMemory();
private EmptyMemory() {
}
- public static Memory instance() {
+ public static EmptyMemory instance() {
return INSTANCE;
}
+ @Override
+ public void setIteration(final int iteration) {
+ throw Memory.Exceptions.memoryIsCurrentlyImmutable();
+ }
+
+ @Override
+ public void setRuntime(final long runtime) {
+ throw Memory.Exceptions.memoryIsCurrentlyImmutable();
+ }
+
+ @Override
+ public Memory asImmutable() {
+ return this;
+ }
+
+ @Override
+ public Set<String> keys() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public <R> R get(final String key) throws IllegalArgumentException {
+ throw Memory.Exceptions.memoryDoesNotExist(key);
+ }
+
+ @Override
+ public void set(final String key, final Object value) throws IllegalArgumentException, IllegalStateException {
+ throw Memory.Exceptions.memoryIsCurrentlyImmutable();
+ }
+
+ @Override
+ public void add(final String key, final Object value) throws IllegalArgumentException, IllegalStateException {
+ throw Memory.Exceptions.memoryIsCurrentlyImmutable();
+ }
+
+ @Override
+ public int getIteration() {
+ return 0;
+ }
+
+ @Override
+ public long getRuntime() {
+ return 0;
+ }
+
+ @Override
+ public boolean exists(final String key) {
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/SingleMessenger.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/SingleMessenger.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/SingleMessenger.java
new file mode 100644
index 0000000..63ee282
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/SingleMessenger.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.computer.util;
+
+import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
+import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SingleMessenger<M> implements Messenger<M> {
+
+ private final Messenger<M> baseMessenger;
+ private final M message;
+
+ public SingleMessenger(final Messenger<M> baseMessenger, final M message) {
+ this.baseMessenger = baseMessenger;
+ this.message = message;
+ }
+
+ @Override
+ public Iterator<M> receiveMessages() {
+ return IteratorUtils.of(this.message);
+ }
+
+ @Override
+ public void sendMessage(final MessageScope messageScope, final M message) {
+ this.baseMessenger.sendMessage(messageScope, message);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPeerPressureTest.groovy
----------------------------------------------------------------------
diff --git a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPeerPressureTest.groovy b/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPeerPressureTest.groovy
index eb1ad67..6ec0750 100644
--- a/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPeerPressureTest.groovy
+++ b/gremlin-groovy-test/src/main/groovy/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroovyPeerPressureTest.groovy
@@ -39,5 +39,10 @@ public abstract class GroovyPeerPressureTest {
public Traversal<Vertex, Map<Object, Number>> get_g_V_peerPressure_byXclusterX_byXoutEXknowsXX_pageRankX1X_byXrankX_byXoutEXknowsXX_timesX2X_group_byXclusterX_byXrank_sumX_limitX100X() {
new ScriptTraversal<>(g, "gremlin-groovy", "g.V.peerPressure.by('cluster').by(outE('knows')).pageRank(1.0).by('rank').by(outE('knows')).times(1).group.by('cluster').by(values('rank').sum).limit(100)")
}
+
+ @Override
+ public Traversal<Vertex, Map<String, List<Object>>> get_g_V_hasXname_rippleX_inXcreatedX_peerPressure_byXoutEX_byXclusterX_repeatXunionXidentity__bothX_timesX2X_dedup_valueMapXname_clusterX() {
+ new ScriptTraversal<>(g, "gremlin-groovy", "g.V.has('name', 'ripple').in('created').peerPressure.by(outE()).by('cluster').repeat(union(identity(), both())).times(2).dedup.valueMap('name', 'cluster')")
+ }
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PeerPressureTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PeerPressureTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PeerPressureTest.java
index 21c4f43..5a2477c 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PeerPressureTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/PeerPressureTest.java
@@ -27,7 +27,11 @@ import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.junit.Test;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
import java.util.Map;
+import java.util.Set;
import static org.apache.tinkerpop.gremlin.LoadGraphWith.GraphData.MODERN;
import static org.junit.Assert.assertEquals;
@@ -43,6 +47,7 @@ public abstract class PeerPressureTest extends AbstractGremlinProcessTest {
public abstract Traversal<Vertex, Map<Object, Number>> get_g_V_peerPressure_byXclusterX_byXoutEXknowsXX_pageRankX1X_byXrankX_byXoutEXknowsXX_timesX2X_group_byXclusterX_byXrank_sumX_limitX100X();
+ public abstract Traversal<Vertex, Map<String, List<Object>>> get_g_V_hasXname_rippleX_inXcreatedX_peerPressure_byXoutEX_byXclusterX_repeatXunionXidentity__bothX_timesX2X_dedup_valueMapXname_clusterX();
@Test
@LoadGraphWith(MODERN)
@@ -72,6 +77,27 @@ public abstract class PeerPressureTest extends AbstractGremlinProcessTest {
assertEquals(0.0d, (double) map.get(convertToVertexId("peter")), 0.001d);
}
+ @Test
+ @LoadGraphWith(MODERN)
+ public void g_V_hasXname_rippleX_inXcreatedX_peerPressure_byXoutEX_byXclusterX_repeatXunionXidentity__bothX_timesX2X_dedup_valueMapXname_clusterX() {
+ final Traversal<Vertex, Map<String, List<Object>>> traversal = get_g_V_hasXname_rippleX_inXcreatedX_peerPressure_byXoutEX_byXclusterX_repeatXunionXidentity__bothX_timesX2X_dedup_valueMapXname_clusterX();
+ printTraversalForm(traversal);
+ final List<Map<String, List<Object>>> results = traversal.toList();
+ assertEquals(6, results.size());
+ final Map<String, Object> clusters = new HashMap<>();
+ results.forEach(m -> clusters.put((String) m.get("name").get(0), m.get("cluster").get(0)));
+ assertEquals(2, results.get(0).size());
+ assertEquals(6, clusters.size());
+ assertEquals(clusters.get("josh"), clusters.get("ripple"));
+ assertEquals(clusters.get("josh"), clusters.get("lop"));
+ final Set<Object> ids = new HashSet<>(clusters.values());
+ assertEquals(4, ids.size());
+ assertTrue(ids.contains(convertToVertexId("marko")));
+ assertTrue(ids.contains(convertToVertexId("vadas")));
+ assertTrue(ids.contains(convertToVertexId("josh")));
+ assertTrue(ids.contains(convertToVertexId("peter")));
+ }
+
public static class Traversals extends PeerPressureTest {
@@ -84,5 +110,10 @@ public abstract class PeerPressureTest extends AbstractGremlinProcessTest {
public Traversal<Vertex, Map<Object, Number>> get_g_V_peerPressure_byXclusterX_byXoutEXknowsXX_pageRankX1X_byXrankX_byXoutEXknowsXX_timesX2X_group_byXclusterX_byXrank_sumX_limitX100X() {
return g.V().peerPressure().by("cluster").by(__.outE("knows")).pageRank(1.0d).by("rank").by(__.outE("knows")).times(1).<Object, Number>group().by("cluster").by(__.values("rank").sum()).limit(100);
}
+
+ @Override
+ public Traversal<Vertex, Map<String, List<Object>>> get_g_V_hasXname_rippleX_inXcreatedX_peerPressure_byXoutEX_byXclusterX_repeatXunionXidentity__bothX_timesX2X_dedup_valueMapXname_clusterX() {
+ return g.V().has("name", "ripple").in("created").peerPressure().by(__.outE()).by("cluster").repeat(__.union(__.identity(), __.both())).times(2).dedup().valueMap("name", "cluster");
+ }
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
index 8b2a293..5f548da 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/ProgramTest.java
@@ -27,6 +27,7 @@ import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MemoryComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
import org.apache.tinkerpop.gremlin.process.computer.Messenger;
+import org.apache.tinkerpop.gremlin.process.computer.ProgramPhase;
import org.apache.tinkerpop.gremlin.process.computer.VertexComputeKey;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
import org.apache.tinkerpop.gremlin.process.computer.ranking.pagerank.PageRankVertexProgram;
@@ -187,7 +188,7 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest {
@Override
public void setup(final Memory memory) {
- MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.SETUP);
+ MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, ProgramPhase.SETUP);
final Map<Vertex, Long> map = (Map<Vertex, Long>) this.haltedTraversers.iterator().next().get();
assertEquals(2, map.size());
assertTrue(map.values().contains(3l));
@@ -205,7 +206,7 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest {
public void execute(final Vertex vertex, final Messenger messenger, final Memory memory) {
assertFalse(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
final TraverserGenerator generator = this.traversal.get().getTraverserGenerator();
- MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.EXECUTE);
+ MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, ProgramPhase.EXECUTE);
this.checkSideEffects();
final TraverserSet<Vertex> activeTraversers = memory.get(TraversalVertexProgram.ACTIVE_TRAVERSERS);
if (vertex.label().equals("software")) {
@@ -233,7 +234,7 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest {
@Override
public boolean terminate(final Memory memory) {
final TraverserGenerator generator = this.traversal.get().getTraverserGenerator();
- MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.TERMINATE);
+ MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, ProgramPhase.TERMINATE);
checkSideEffects();
if (memory.isInitialIteration()) {
assertFalse(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
@@ -254,14 +255,14 @@ public abstract class ProgramTest extends AbstractGremlinProcessTest {
assertNotNull(this.haltedTraversers);
this.haltedTraversers.clear();
assertFalse(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
- MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.WORKER_ITERATION_START);
+ MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, ProgramPhase.WORKER_ITERATION_START);
checkSideEffects();
}
@Override
public void workerIterationEnd(final Memory memory) {
assertFalse(memory.exists(TraversalVertexProgram.HALTED_TRAVERSERS));
- MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, MemoryTraversalSideEffects.State.WORKER_ITERATION_END);
+ MemoryTraversalSideEffects.setMemorySideEffects(this.traversal.get(), memory, ProgramPhase.WORKER_ITERATION_END);
checkSideEffects();
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/5bcf0b01/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
index 6e35cf8..768d10a 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/traversal/strategy/optimization/interceptor/SparkStarBarrierInterceptor.java
@@ -22,6 +22,7 @@ package org.apache.tinkerpop.gremlin.spark.process.computer.traversal.strategy.o
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
+import org.apache.tinkerpop.gremlin.process.computer.ProgramPhase;
import org.apache.tinkerpop.gremlin.process.computer.traversal.MemoryTraversalSideEffects;
import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
import org.apache.tinkerpop.gremlin.process.traversal.NumberHelper;
@@ -75,7 +76,7 @@ public final class SparkStarBarrierInterceptor implements SparkVertexProgramInte
traversal.applyStrategies(); // compile
boolean identityTraversal = traversal.getSteps().isEmpty(); // if the traversal is empty, just return the vertex (fast)
///////////////////////////////
- MemoryTraversalSideEffects.setMemorySideEffects(traversal, memory, MemoryTraversalSideEffects.State.EXECUTE); // any intermediate sideEffect steps are backed by SparkMemory
+ MemoryTraversalSideEffects.setMemorySideEffects(traversal, memory, ProgramPhase.EXECUTE); // any intermediate sideEffect steps are backed by SparkMemory
memory.setInExecute(true);
final JavaRDD<Traverser.Admin<Object>> nextRDD = inputRDD.values()
.filter(vertexWritable -> ElementHelper.idExists(vertexWritable.get().id(), graphStepIds)) // ensure vertex ids are in V(x)