You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/25 14:08:28 UTC
tinkerpop git commit: javadoc'ing and code reorganization.
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 b81dbc3b7 -> 12cce2b82
javadoc'ing and code reorganization.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/12cce2b8
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/12cce2b8
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/12cce2b8
Branch: refs/heads/TINKERPOP-1564
Commit: 12cce2b8249a936ea0db8245dc8e044b2f3f7e59
Parents: b81dbc3
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 25 07:08:24 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 25 07:08:24 2017 -0700
----------------------------------------------------------------------
.../tinkerpop/gremlin/process/actors/Actor.java | 18 +++-
.../gremlin/process/actors/ActorProgram.java | 35 ++++---
.../traversal/TraversalMasterProgram.java | 14 ++-
.../traversal/TraversalWorkerProgram.java | 96 ++++++++++----------
.../traversal/step/filter/DedupGlobalStep.java | 2 +-
5 files changed, 102 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/12cce2b8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
index 0f75e20..aec8632 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
@@ -36,7 +36,7 @@ import java.util.List;
public interface Actor {
/**
- * Get the {@link Address} of the actors.
+ * Get the {@link Address} of the actor.
*
* @return the actors's address
*/
@@ -81,14 +81,30 @@ public interface Actor {
*/
public Partitioner partitioner();
+ /**
+ * The master actor is responsible for yielding the final result of the computation.
+ *
+ * @param result the final result of the computation
+ */
public void setResult(final R result);
}
public interface Worker extends Actor {
+ /**
+ * Get the worker actor's address.
+ *
+ * @return the worker actor's address
+ */
+ @Override
public Address.Worker address();
+ /**
+ * Get the address of the worker's master actor.
+ *
+ * @return the master actor's address
+ */
public Address.Master master();
/**
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/12cce2b8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
index 4063090..d1db2e1 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
@@ -114,46 +114,59 @@ public interface ActorProgram<R> extends Cloneable {
}
/**
- * The Worker program is executed by a worker process in the {@link GraphActors} system.
+ * The Master program is executed by the master process in the {@link GraphActors} system.
* There are many workers and a single master.
- * All workers execute the same program.
*
- * @param <M> The message type accepted by the worker
+ * @param <M> The message type accepted by the master
*/
- public static interface Worker<M> {
-
+ public static interface Master<M> {
/**
- * This method is evaluated when the worker process is spawned.
+ * This method is evaluated when the master actor is spawned.
*/
public void setup();
/**
- * This method is evaluated when the worker receives a new message.
+ * This method is evaluated when the master actor receives a new message.
*
* @param message the received message
*/
public void execute(final M message);
/**
- * This method is evaluated when the worker process is destroyed.
+ * This method is evaluated when the master actor is destroyed.
*/
public void terminate();
}
+
/**
- * The Master program is executed by the master process in the {@link GraphActors} system.
+ * The Worker program is executed by a worker process in the {@link GraphActors} system.
* There are many workers and a single master.
+ * All workers execute the same program.
*
- * @param <M> The message type accepted by the master
+ * @param <M> The message type accepted by the worker
*/
- public static interface Master<M> {
+ public static interface Worker<M> {
+
+ /**
+ * This method is evaluated when the worker actor is spawned.
+ */
public void setup();
+ /**
+ * This method is evaluated when the worker receives a new message.
+ *
+ * @param message the received message
+ */
public void execute(final M message);
+ /**
+ * This method is evaluated when the worker actor is destroyed.
+ */
public void terminate();
}
+
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/12cce2b8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
index 0581ba5..7e5340d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
@@ -37,6 +37,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
import org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeGlobalStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.filter.TailGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectCapStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.OrderedTraverser;
@@ -198,7 +199,7 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
this.barriersDone = false;
} else {
while (step.hasNext()) {
- this.processTraverser(step.next());
+ this.sendTraverser(step.next());
}
}
}
@@ -210,11 +211,18 @@ final class TraversalMasterProgram<R> implements ActorProgram.Master<Object> {
this.traverserResults.add(traverser);
return;
}
+ //////
this.voteToHalt = false;
- if (traverser.get() instanceof Element)
+ if (this.matrix.getStepById(traverser.getStepId()) instanceof GraphStep) {
+ // mid-traversal V()/E() traversers need to be broadcasted across all workers/partitions
+ for (final Address.Worker worker : this.master.workers()) {
+ this.master.send(worker, traverser);
+ }
+ } else if (traverser.get() instanceof Element) {
this.master.send(this.partitionToWorkerMap.get(this.master.partitioner().find((Element) traverser.get())), this.detachTraverser(traverser));
- else
+ } else {
this.master.send(this.master.address(), this.detachTraverser(traverser));
+ }
}
private void orderBarrier(final Step step) {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/12cce2b8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
index 31d8a0f..2f86e58 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
@@ -48,7 +48,7 @@ import java.util.Map;
*/
final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
- private final Actor.Worker self;
+ private final Actor.Worker worker;
private final TraversalMatrix<?, ?> matrix;
private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>();
//
@@ -56,10 +56,10 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
private boolean voteToHalt = true;
private Map<String, Barrier> barriers = new HashMap<>();
- public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal) {
- this.self = self;
+ public TraversalWorkerProgram(final Actor.Worker worker, final Traversal.Admin<?, ?> traversal) {
+ this.worker = worker;
// create a pass-through side-effects which sends SideEffectAddMessages to the master actor
- final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self);
+ final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.worker);
TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal);
this.matrix = new TraversalMatrix<>(traversal);
// configure distributing and pushing semantics for worker execution
@@ -69,12 +69,12 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
TraversalHelper.getStepsOfAssignableClassRecursively(GraphStep.class, this.matrix.getTraversal()).forEach(graphStep -> {
if (0 == graphStep.getIds().length)
graphStep.setIteratorSupplier(graphStep.returnsVertex() ?
- this.self.partition()::vertices :
- this.self.partition()::edges);
+ this.worker.partition()::vertices :
+ this.worker.partition()::edges);
else {
graphStep.setIteratorSupplier(graphStep.returnsVertex() ?
- () -> IteratorUtils.filter(self.partition().vertices(graphStep.getIds()), this.self.partition()::contains) :
- () -> IteratorUtils.filter(self.partition().edges(graphStep.getIds()), this.self.partition()::contains));
+ () -> IteratorUtils.filter(worker.partition().vertices(graphStep.getIds()), this.worker.partition()::contains) :
+ () -> IteratorUtils.filter(worker.partition().edges(graphStep.getIds()), this.worker.partition()::contains));
}
});
}
@@ -82,10 +82,10 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
@Override
public void setup() {
// create termination ring topology
- final int i = this.self.workers().indexOf(this.self.address());
- this.neighborAddress = i == this.self.workers().size() - 1 ? this.self.master() : this.self.workers().get(i + 1);
- for (int j = 0; j < this.self.partition().partitioner().getPartitions().size(); j++) {
- this.partitionToWorkerMap.put(this.self.partition().partitioner().getPartitions().get(j), this.self.workers().get(j));
+ final int i = this.worker.workers().indexOf(this.worker.address());
+ this.neighborAddress = i == this.worker.workers().size() - 1 ? this.worker.master() : this.worker.workers().get(i + 1);
+ for (int j = 0; j < this.worker.partition().partitioner().getPartitions().size(); j++) {
+ this.partitionToWorkerMap.put(this.worker.partition().partitioner().getPartitions().get(j), this.worker.workers().get(j));
}
// once loaded, start processing start step
final Step<?, ?> step = this.matrix.getTraversal().getStartStep();
@@ -97,11 +97,35 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
@Override
public void execute(final Object message) {
if (message instanceof Traverser.Admin) {
- this.processTraverser((Traverser.Admin) message);
+ ////////// PROCESS TRAVERSER //////////
+ final Traverser.Admin traverser = (Traverser.Admin) message;
+ // only mid-traversal V()/E() traversers can be non-locally processed
+ assert !(traverser.get() instanceof Element) ||
+ this.worker.partition().contains((Element) traverser.get()) ||
+ this.matrix.getStepById(traverser.getStepId()) instanceof GraphStep;
+ if (traverser.isHalted()) {
+ // send halted traversers to master
+ this.sendTraverser(traverser);
+ } else {
+ // locally process traverser
+ TraversalActorProgram.attach(traverser, this.worker.partition());
+ final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
+ step.addStart(traverser);
+ if (step instanceof Barrier) {
+ this.barriers.put(step.getId(), (Barrier) step);
+ } else {
+ while (step.hasNext()) {
+ this.sendTraverser(step.next());
+ }
+ }
+ }
} else if (message instanceof SideEffectSetMessage) {
- this.matrix.getTraversal().getSideEffects().
- set(((SideEffectSetMessage) message).getKey(), TraversalActorProgram.attach(((SideEffectSetMessage) message).getValue(), this.self.partition()));
+ ////////// UPDATE LOCAL SIDE-EFFECTS //////////
+ this.matrix.getTraversal().getSideEffects().set(
+ ((SideEffectSetMessage) message).getKey(),
+ TraversalActorProgram.attach(((SideEffectSetMessage) message).getValue(), this.worker.partition()));
} else if (message instanceof BarrierDoneMessage) {
+ ////////// FINALIZE BARRIER SYNCHRONIZATION //////////
final Step<?, ?> step = (Step) this.matrix.getStepById(((BarrierDoneMessage) message).getStepId());
if (step instanceof LocalBarrier) { // the worker drains the local barrier
while (step.hasNext()) {
@@ -110,15 +134,16 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
} else
((Barrier) step).done(); // the master drains the global barrier
} else if (message instanceof Terminate) {
+ ////////// DETERMINE TERMINATION CONDITION //////////
final Terminate terminate = (Terminate) message;
if (this.voteToHalt && !this.barriers.isEmpty()) {
for (final Barrier barrier : this.barriers.values()) {
if (barrier instanceof LocalBarrier) {
barrier.processAllStarts();
- this.self.send(this.self.master(), new BarrierAddMessage(barrier));
+ this.worker.send(this.worker.master(), new BarrierAddMessage(barrier));
} else {
while (barrier.hasNextBarrier()) {
- this.self.send(this.self.master(), new BarrierAddMessage(barrier));
+ this.worker.send(this.worker.master(), new BarrierAddMessage(barrier));
}
}
}
@@ -126,7 +151,7 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
this.voteToHalt = false;
}
// use termination token to determine termination condition
- this.self.send(this.neighborAddress, this.voteToHalt ? terminate : Terminate.NO);
+ this.worker.send(this.neighborAddress, this.voteToHalt ? terminate : Terminate.NO);
this.voteToHalt = true;
} else {
throw new IllegalArgumentException("The following message is unknown: " + message);
@@ -140,46 +165,23 @@ final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
//////////////
- private void processTraverser(final Traverser.Admin traverser) {
- // only mid-traversal V()/E() traversers can be non-locally processed
- assert !(traverser.get() instanceof Element) ||
- this.self.partition().contains((Element) traverser.get()) ||
- this.matrix.getStepById(traverser.getStepId()) instanceof GraphStep;
- if (traverser.isHalted()) {
- // send halted traversers to master
- this.sendTraverser(traverser);
- } else {
- // locally process traverser
- TraversalActorProgram.attach(traverser, this.self.partition());
- final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
- step.addStart(traverser);
- if (step instanceof Barrier) {
- this.barriers.put(step.getId(), (Barrier) step);
- } else {
- while (step.hasNext()) {
- this.sendTraverser(step.next());
- }
- }
- }
- }
-
private void sendTraverser(final Traverser.Admin traverser) {
this.voteToHalt = false;
this.detachTraverser(traverser);
if (traverser.isHalted()) {
// send halted traversers to master
- this.self.send(this.self.master(), traverser);
+ this.worker.send(this.worker.master(), traverser);
} else if (this.matrix.getStepById(traverser.getStepId()) instanceof GraphStep) {
// mid-traversal V()/E() traversers need to be broadcasted across all workers/partitions
- for (final Address.Worker worker : this.self.workers()) {
- this.self.send(worker, traverser);
+ for (final Address.Worker worker : this.worker.workers()) {
+ this.worker.send(worker, traverser);
}
- } else if (traverser.get() instanceof Element && !this.self.partition().contains((Element) traverser.get())) {
+ } else if (traverser.get() instanceof Element && !this.worker.partition().contains((Element) traverser.get())) {
// if the traverser references a non-local element, send the traverser to the appropriate worker/partition
- this.self.send(this.partitionToWorkerMap.get(this.self.partition().partitioner().find((Element) traverser.get())), traverser);
+ this.worker.send(this.partitionToWorkerMap.get(this.worker.partition().partitioner().find((Element) traverser.get())), traverser);
} else {
// if the traverser is local to the worker, send traverser to self
- this.self.send(this.self.address(), traverser);
+ this.worker.send(this.worker.address(), traverser);
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/12cce2b8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
index 7724dae..22fdcf1 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/DedupGlobalStep.java
@@ -69,7 +69,7 @@ public final class DedupGlobalStep<S> extends FilterStep<S> implements Traversal
@Override
protected boolean filter(final Traverser.Admin<S> traverser) {
- if (this.pushBased && this.atWorker) return false;
+ if (this.pushBased && this.atWorker) return false; // todo: study why this is needed
traverser.setBulk(1);
if (null == this.dedupLabels) {
return this.duplicateSet.add(TraversalUtil.applyNullable(traverser, this.dedupTraversal));