You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/15 01:22:20 UTC
tinkerpop git commit: learned about preStart() and postStop() methods
in Akka Actors. Lined those up with actor.terminate() and actor.setup().
JavaDoc here and there.... nada much. done for the night.
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 718d66576 -> f3658a5a8
learned about preStart() and postStop() methods in Akka Actors. Lined those up with actor.terminate() and actor.setup(). JavaDoc here and there.... nada much. done for the night.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f3658a5a
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f3658a5a
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f3658a5a
Branch: refs/heads/TINKERPOP-1564
Commit: f3658a5a8d21a29a171b36e93ad88236ec25646a
Parents: 718d665
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Dec 14 18:22:14 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Dec 14 18:22:14 2016 -0700
----------------------------------------------------------------------
.../gremlin/akka/process/actor/MasterActor.java | 16 ++++++++++---
.../gremlin/akka/process/actor/WorkerActor.java | 16 ++++++++++---
.../gremlin/process/actor/GraphActors.java | 3 +++
.../actor/traversal/TraversalWorkerProgram.java | 13 ++++------
.../step/map/TraversalActorProgramStep.java | 6 -----
.../gremlin/process/computer/Computer.java | 25 +++++++++++++++++---
6 files changed, 56 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f3658a5a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
index 05bedbc..29cd212 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
@@ -42,6 +42,7 @@ import java.util.Map;
*/
public final class MasterActor extends AbstractActor implements RequiresMessageQueue<ActorMailbox.ActorSemantics>, Actor.Master {
+ private final ActorProgram.Master masterProgram;
private final Address.Master master;
private final List<Address.Worker> workers;
private final Map<Address, ActorSelection> actors = new HashMap<>();
@@ -59,9 +60,18 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
this.workers.add(new Address.Worker(workerPathString, partition.location()));
context().actorOf(Props.create(WorkerActor.class, program, this.master, partition, partitioner), workerPathString);
}
- final ActorProgram.Master masterProgram = program.createMasterProgram(this);
- receive(ReceiveBuilder.matchAny(masterProgram::execute).build());
- masterProgram.setup();
+ this.masterProgram = program.createMasterProgram(this);
+ receive(ReceiveBuilder.matchAny(this.masterProgram::execute).build());
+ }
+
+ @Override
+ public void preStart() {
+ this.masterProgram.setup();
+ }
+
+ @Override
+ public void postStop() {
+ this.masterProgram.terminate();
}
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f3658a5a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
index d83252e..e043c20 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
@@ -39,6 +39,7 @@ import java.util.Map;
*/
public final class WorkerActor extends AbstractActor implements RequiresMessageQueue<ActorMailbox.ActorSemantics>, Actor.Worker {
+ private final ActorProgram.Worker workerProgram;
private final Partition localPartition;
private final Address.Worker self;
private final Address.Master master;
@@ -53,9 +54,18 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ
for (final Partition partition : partitioner.getPartitions()) {
this.workers.add(new Address.Worker(this.createWorkerAddress(partition), partition.location()));
}
- final ActorProgram.Worker workerProgram = program.createWorkerProgram(this);
- receive(ReceiveBuilder.matchAny(workerProgram::execute).build());
- workerProgram.setup();
+ this.workerProgram = program.createWorkerProgram(this);
+ receive(ReceiveBuilder.matchAny(this.workerProgram::execute).build());
+ }
+
+ @Override
+ public void preStart() {
+ this.workerProgram.setup();
+ }
+
+ @Override
+ public void postStop() {
+ this.workerProgram.terminate();
}
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f3658a5a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java
index 1b01f36..d018397 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java
@@ -24,6 +24,9 @@ import org.apache.tinkerpop.gremlin.process.Processor;
import java.util.concurrent.Future;
/**
+ * GraphActors is a message-passing based graph {@link Processor} that is:
+ * asynchronous, distributed, partition-bound, and traverser-centric.
+ *
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public interface GraphActors<R> extends Processor {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f3658a5a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
index f01c138..ef80f03 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
@@ -52,10 +52,8 @@ import java.util.Map;
*/
final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
-
private final Actor.Worker self;
private final TraversalMatrix<?, ?> matrix;
- private final Partition localPartition;
private final Partitioner partitioner;
private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>();
//
@@ -70,7 +68,6 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
// System.out.println("worker[created]: " + this.self.address().getId());
// set up partition and traversal information
this.partitioner = partitioner;
- this.localPartition = self.partition();
final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self);
TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal);
this.matrix = new TraversalMatrix<>(traversal);
@@ -79,14 +76,14 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
//////
final GraphStep graphStep = (GraphStep) traversal.getStartStep();
if (0 == graphStep.getIds().length)
- ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.localPartition::vertices : this.localPartition::edges);
+ ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.self.partition()::vertices : this.self.partition()::edges);
else {
if (graphStep.returnsVertex())
((GraphStep<Vertex, Vertex>) traversal.getStartStep()).setIteratorSupplier(
- () -> IteratorUtils.filter(this.localPartition.vertices(graphStep.getIds()), this.localPartition::contains));
+ () -> IteratorUtils.filter(self.partition().vertices(graphStep.getIds()), this.self.partition()::contains));
else
((GraphStep<Edge, Edge>) traversal.getStartStep()).setIteratorSupplier(
- () -> IteratorUtils.filter(this.localPartition.edges(graphStep.getIds()), this.localPartition::contains));
+ () -> IteratorUtils.filter(self.partition().edges(graphStep.getIds()), this.self.partition()::contains));
}
}
@@ -153,7 +150,7 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
//////////////
private void processTraverser(final Traverser.Admin traverser) {
- assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get());
+ assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.self.partition().contains((Element) traverser.get());
final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
step.addStart(traverser);
if (step instanceof Barrier) {
@@ -169,7 +166,7 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
this.voteToHalt = false;
if (traverser.isHalted())
this.self.send(this.self.master(), traverser);
- else if (traverser.get() instanceof Element && !this.localPartition.contains((Element) traverser.get()))
+ else if (traverser.get() instanceof Element && !this.self.partition().contains((Element) traverser.get()))
this.self.send(this.partitionToWorkerMap.get(this.partitioner.getPartition((Element) traverser.get())), traverser);
else
this.self.send(this.self.address(), traverser);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f3658a5a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java
index de82599..ba2a08e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java
@@ -19,10 +19,6 @@
package org.apache.tinkerpop.gremlin.process.actor.traversal.step.map;
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-
import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
import org.apache.tinkerpop.gremlin.process.actor.traversal.TraversalActorProgram;
@@ -44,10 +40,8 @@ public final class TraversalActorProgramStep<S, E> extends AbstractStep<E, E> {
private final Class<? extends GraphActors> actorsClass;
private final Traversal.Admin<S, E> actorsTraversal;
private final Partitioner partitioner;
-
private boolean first = true;
-
public TraversalActorProgramStep(final Traversal.Admin<?, ?> traversal, final Class<? extends GraphActors> actorsClass, final Partitioner partitioner) {
super(traversal);
this.actorsClass = actorsClass;
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f3658a5a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java
index d0baec0..9214a5e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java
@@ -23,11 +23,14 @@ import org.apache.tinkerpop.gremlin.process.Processor;
import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decoration.VertexProgramStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.function.Function;
@@ -60,11 +63,18 @@ public final class Computer implements Processor.Description<GraphComputer>, Fun
return new Computer(graphComputerClass);
}
+
+ /**
+ * @deprecated As of release 3.3.0, replaced by using {@link Computer#of()}.
+ */
@Deprecated
public static Computer compute() {
return new Computer(GraphComputer.class);
}
+ /**
+ * @deprecated As of release 3.3.0, replaced by using {@link Computer#of(Class)}.
+ */
@Deprecated
public static Computer compute(final Class<? extends GraphComputer> graphComputerClass) {
return new Computer(graphComputerClass);
@@ -159,9 +169,18 @@ public final class Computer implements Processor.Description<GraphComputer>, Fun
@Override
public void addTraversalStrategies(final TraversalSource traversalSource) {
- final VertexProgramStrategy vertexProgramStrategy = new VertexProgramStrategy(this);
- traversalSource.getStrategies().addStrategies(vertexProgramStrategy);
- vertexProgramStrategy.addGraphComputerStrategies(traversalSource);
+ Class<? extends GraphComputer> graphComputerClass;
+ if (this.getGraphComputerClass().equals(GraphComputer.class)) {
+ try {
+ graphComputerClass = this.apply(traversalSource.getGraph()).getClass();
+ } catch (final Exception e) {
+ graphComputerClass = GraphComputer.class;
+ }
+ } else
+ graphComputerClass = this.getGraphComputerClass();
+ final List<TraversalStrategy<?>> graphComputerStrategies = TraversalStrategies.GlobalCache.getStrategies(graphComputerClass).toList();
+ traversalSource.getStrategies().addStrategies(graphComputerStrategies.toArray(new TraversalStrategy[graphComputerStrategies.size()]));
+ traversalSource.getStrategies().addStrategies(new VertexProgramStrategy(this));
}
/////////////////