You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/23 21:24:00 UTC
[04/50] [abbrv] tinkerpop git commit: more alignment between
GraphActors and GraphComputer. GraphActors.partitioner().program().submit().
Unlike GraphComputer,
there is no Graph.actors(). HOWEVER -- since GraphActors is Partition-centric,
perhaps we do G
more alignment between GraphActors and GraphComputer. GraphActors.partitioner().program().submit(). Unlike GraphComputer, there is no Graph.actors(). HOWEVER -- since GraphActors is Partition-centric, perhaps we do Graph.partitioner().actors(). eek. hehe..
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/9477d530
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/9477d530
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/9477d530
Branch: refs/heads/TINKERPOP-1564
Commit: 9477d530bb494816ad88b6c6620137ae749aeea6
Parents: e8e9ca4
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Dec 15 11:59:48 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Jan 23 14:22:53 2017 -0700
----------------------------------------------------------------------
.../akka/process/actor/AkkaGraphActors.java | 48 +++++++++++++-------
.../gremlin/akka/process/actor/MasterActor.java | 7 +++
.../gremlin/akka/process/actor/WorkerActor.java | 7 +++
.../tinkerpop/gremlin/process/actor/Actor.java | 46 ++++++++++++++++---
.../gremlin/process/actor/GraphActors.java | 7 ++-
.../actor/traversal/TraversalActorProgram.java | 10 ++--
.../actor/traversal/TraversalMasterProgram.java | 14 ++----
.../actor/traversal/TraversalWorkerProgram.java | 12 ++---
.../step/map/TraversalActorProgramStep.java | 6 +--
9 files changed, 105 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9477d530/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
index 51747ac..5739369 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
@@ -44,20 +44,12 @@ import java.util.stream.Collectors;
*/
public final class AkkaGraphActors<R> implements GraphActors<R> {
- private final ActorSystem system;
- private final Address.Master master;
- private final ActorsResult<R> result = new DefaultActorsResult<>();
+ private ActorProgram<R> actorProgram;
+ private Partitioner partitioner;
+ private boolean executed = false;
+
+ public AkkaGraphActors() {
- public AkkaGraphActors(final ActorProgram<R> actorProgram, final Partitioner partitioner) {
- final Config config = ConfigFactory.defaultApplication().
- withValue("message-priorities",
- ConfigValueFactory.fromAnyRef(actorProgram.getMessagePriorities().get().stream().map(Class::getCanonicalName).collect(Collectors.toList()).toString()));
- this.system = ActorSystem.create("traversal-" + UUID.randomUUID(), config);
- try {
- this.master = new Address.Master(this.system.actorOf(Props.create(MasterActor.class, actorProgram, partitioner, result), "master").path().toString(), InetAddress.getLocalHost());
- } catch (final UnknownHostException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
}
@Override
@@ -66,17 +58,39 @@ public final class AkkaGraphActors<R> implements GraphActors<R> {
}
@Override
- public Address.Master master() {
- return this.master;
+ public GraphActors<R> program(final ActorProgram<R> actorProgram) {
+ this.actorProgram = actorProgram;
+ return this;
+ }
+
+ @Override
+ public GraphActors<R> partitioner(final Partitioner partitioner) {
+ this.partitioner = partitioner;
+ return this;
}
@Override
public Future<R> submit() {
+ if (this.executed)
+ throw new IllegalStateException("Can not execute twice");
+ this.executed = true;
+ final ActorSystem system;
+ final ActorsResult<R> result = new DefaultActorsResult<>();
+
+ final Config config = ConfigFactory.defaultApplication().
+ withValue("message-priorities",
+ ConfigValueFactory.fromAnyRef(actorProgram.getMessagePriorities().get().stream().map(Class::getCanonicalName).collect(Collectors.toList()).toString()));
+ system = ActorSystem.create("traversal-" + UUID.randomUUID(), config);
+ try {
+ new Address.Master(system.actorOf(Props.create(MasterActor.class, actorProgram, partitioner, result), "master").path().toString(), InetAddress.getLocalHost());
+ } catch (final UnknownHostException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
return CompletableFuture.supplyAsync(() -> {
- while (!this.system.isTerminated()) {
+ while (!system.isTerminated()) {
}
- return this.result.getResult();
+ return result.getResult();
});
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9477d530/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
index 11069f2..a4ef639 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
@@ -48,8 +48,10 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
private final List<Address.Worker> workers;
private final Map<Address, ActorSelection> actors = new HashMap<>();
private final ActorsResult<?> result;
+ private final Partitioner partitioner;
public MasterActor(final ActorProgram program, final Partitioner partitioner, final ActorsResult<?> result) {
+ this.partitioner = partitioner;
this.result = result;
try {
this.master = new Address.Master(self().path().toString(), InetAddress.getLocalHost());
@@ -93,6 +95,11 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
}
@Override
+ public Partitioner partitioner() {
+ return this.partitioner;
+ }
+
+ @Override
public Address.Master address() {
return this.master;
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9477d530/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
index e043c20..35b5a4f 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
@@ -41,6 +41,7 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ
private final ActorProgram.Worker workerProgram;
private final Partition localPartition;
+ private final Partitioner partitioner;
private final Address.Worker self;
private final Address.Master master;
private final List<Address.Worker> workers;
@@ -48,6 +49,7 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ
public WorkerActor(final ActorProgram program, final Address.Master master, final Partition localPartition, final Partitioner partitioner) {
this.localPartition = localPartition;
+ this.partitioner = partitioner;
this.self = new Address.Worker(this.createWorkerAddress(localPartition), localPartition.location());
this.master = master;
this.workers = new ArrayList<>();
@@ -89,6 +91,11 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ
}
@Override
+ public Partitioner partitioner() {
+ return this.partitioner;
+ }
+
+ @Override
public Address.Worker address() {
return this.self;
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9477d530/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
index e2f596e..5a0b869 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
@@ -20,23 +20,53 @@
package org.apache.tinkerpop.gremlin.process.actor;
import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
import java.util.List;
/**
+ * An Actor represents an isolated processing unit that can only be interacted with via messages.
+ * Actors are able to send and receive messages. The {@link GraphActors} framework has two types of actors:
+ * {@link Master} and {@link Worker}. A master actor is not associated with a particular graph {@link Partition}.
+ * Instead, its role is to coordinate the workers and ultimately, yield the final result of the submitted
+ * {@link ActorProgram}.
+ *
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public interface Actor {
+ /**
+ * Get the {@link Partitioner} associated with the {@link GraphActors} system.
+ *
+ * @return the partitioner used to partition (logically and/or physically) the {@link org.apache.tinkerpop.gremlin.structure.Graph}
+ */
+ public Partitioner partitioner();
+
+ /**
+ * Get the {@link Address} of the actor.
+ *
+ * @return the actor's address
+ */
public Address address();
+ /**
+ * Get a list of the {@link Address} values of all the workers in {@link GraphActors} system.
+ *
+ * @return the worker's addresses
+ */
+ public List<Address.Worker> workers();
+
+ /**
+ * Send a message from this actor to another actor given their {@link Address}.
+ *
+ * @param toActor the actor to receive the messages
+ * @param message the message being sent
+ * @param <M> the message type
+ */
public <M> void send(final Address toActor, final M message);
-
public interface Master extends Actor {
- public List<Address.Worker> workers();
-
public Address.Master address();
public void close();
@@ -51,10 +81,14 @@ public interface Actor {
public Address.Master master();
- public List<Address.Worker> workers();
-
+ /**
+ * Get the {@link Partition} associated with this worker.
+ * In principle, this is the subset of the {@link org.apache.tinkerpop.gremlin.structure.Graph} that
+ * the worker is "data-local" to.
+ *
+ * @return the worker's partition
+ */
public Partition partition();
-
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9477d530/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java
index d018397..c19dbf7 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java
@@ -20,18 +20,21 @@
package org.apache.tinkerpop.gremlin.process.actor;
import org.apache.tinkerpop.gremlin.process.Processor;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
import java.util.concurrent.Future;
/**
* GraphActors is a message-passing based graph {@link Processor} that is:
- * asynchronous, distributed, partition-bound, and traverser-centric.
+ * asynchronous, distributed, and partition centric.
*
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public interface GraphActors<R> extends Processor {
- public Address.Master master();
+ public GraphActors<R> program(final ActorProgram<R> program);
+
+ public GraphActors<R> partitioner(final Partitioner partitioner);
public Future<R> submit();
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9477d530/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
index 6bfdff7..b584322 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.process.actor.traversal;
import org.apache.tinkerpop.gremlin.process.actor.Actor;
import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
-import org.apache.tinkerpop.gremlin.process.actor.ActorsResult;
import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage;
import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage;
import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage;
@@ -41,7 +40,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.Path
import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.RepeatUnrollStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
import java.util.ArrayList;
import java.util.Arrays;
@@ -63,11 +61,9 @@ public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet
Terminate.class);
private Traversal.Admin<?, R> traversal;
- private final Partitioner partitioner;
public TraverserSet<R> result = new TraverserSet<>();
- public TraversalActorProgram(final Traversal.Admin<?, R> traversal, final Partitioner partitioner) {
- this.partitioner = partitioner;
+ public TraversalActorProgram(final Traversal.Admin<?, R> traversal) {
this.traversal = traversal;
final TraversalStrategies strategies = this.traversal.getStrategies().clone();
strategies.addStrategies(ActorVerificationStrategy.instance(), ReadOnlyStrategy.instance());
@@ -89,12 +85,12 @@ public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet
@Override
public Worker createWorkerProgram(final Actor.Worker worker) {
- return new TraversalWorkerProgram<>(worker, this.traversal.clone(), this.partitioner);
+ return new TraversalWorkerProgram<>(worker, this.traversal.clone());
}
@Override
public Master createMasterProgram(final Actor.Master master) {
- return new TraversalMasterProgram<>(master, this.traversal.clone(), this.partitioner, this.result);
+ return new TraversalMasterProgram<>(master, this.traversal.clone(), this.result);
}
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9477d530/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
index e15106f..2aaf686 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
@@ -44,7 +44,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSe
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Partition;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
import java.util.HashMap;
import java.util.Map;
@@ -54,23 +53,20 @@ import java.util.Map;
*/
final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
-
private final Actor.Master master;
private final Traversal.Admin<?, ?> traversal;
private final TraversalMatrix<?, ?> matrix;
- private final Partitioner partitioner;
private Map<String, Barrier> barriers = new HashMap<>();
private final TraverserSet<?> results;
private Address.Worker leaderWorker;
private int orderCounter = -1;
- private final Map<Partition,Address.Worker> partitionToWorkerMap = new HashMap<>();
+ private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>();
- public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final Partitioner partitioner, final TraverserSet<?> results) {
+ public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final TraverserSet<?> results) {
this.traversal = traversal;
// System.out.println("master[created]: " + master.address().getId());
// System.out.println(this.traversal);
this.matrix = new TraversalMatrix<>(this.traversal);
- this.partitioner = partitioner;
this.results = results;
this.master = master;
Distributing.configure(this.traversal, true, true);
@@ -80,8 +76,8 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
@Override
public void setup() {
this.leaderWorker = this.master.workers().get(0);
- for(int i=0; i<this.partitioner.getPartitions().size(); i++) {
- this.partitionToWorkerMap.put(this.partitioner.getPartitions().get(i),this.master.workers().get(i));
+ for (int i = 0; i < this.master.partitioner().getPartitions().size(); i++) {
+ this.partitionToWorkerMap.put(this.master.partitioner().getPartitions().get(i), this.master.workers().get(i));
}
this.broadcast(StartMessage.instance());
this.master.send(this.leaderWorker, Terminate.MAYBE);
@@ -167,7 +163,7 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
if (traverser.isHalted())
this.results.add(traverser);
else if (traverser.get() instanceof Element)
- this.master.send(this.partitionToWorkerMap.get(this.partitioner.getPartition((Element) traverser.get())), traverser);
+ this.master.send(this.partitionToWorkerMap.get(this.master.partitioner().getPartition((Element) traverser.get())), traverser);
else
this.master.send(this.master.address(), traverser);
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9477d530/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
index ef80f03..001219a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
@@ -31,7 +31,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
@@ -40,7 +39,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Element;
import org.apache.tinkerpop.gremlin.structure.Partition;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
@@ -54,7 +52,6 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
private final Actor.Worker self;
private final TraversalMatrix<?, ?> matrix;
- private final Partitioner partitioner;
private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>();
//
private Address.Worker neighborWorker;
@@ -63,11 +60,10 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
private boolean voteToHalt = false;
private Map<String, Barrier> barriers = new HashMap<>();
- public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
+ public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal) {
this.self = self;
// System.out.println("worker[created]: " + this.self.address().getId());
// set up partition and traversal information
- this.partitioner = partitioner;
final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self);
TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal);
this.matrix = new TraversalMatrix<>(traversal);
@@ -93,8 +89,8 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
final int i = this.self.workers().indexOf(this.self.address());
this.neighborWorker = this.self.workers().get(i == this.self.workers().size() - 1 ? 0 : i + 1);
this.isLeader = i == 0;
- for (int j = 0; j < this.partitioner.getPartitions().size(); j++) {
- this.partitionToWorkerMap.put(this.partitioner.getPartitions().get(j), this.self.workers().get(j));
+ for (int j = 0; j < this.self.partitioner().getPartitions().size(); j++) {
+ this.partitionToWorkerMap.put(this.self.partitioner().getPartitions().get(j), this.self.workers().get(j));
}
}
@@ -167,7 +163,7 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
if (traverser.isHalted())
this.self.send(this.self.master(), traverser);
else if (traverser.get() instanceof Element && !this.self.partition().contains((Element) traverser.get()))
- this.self.send(this.partitionToWorkerMap.get(this.partitioner.getPartition((Element) traverser.get())), traverser);
+ this.self.send(this.partitionToWorkerMap.get(this.self.partitioner().getPartition((Element) traverser.get())), traverser);
else
this.self.send(this.self.address(), traverser);
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9477d530/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java
index ba2a08e..5d643af 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java
@@ -60,9 +60,9 @@ public final class TraversalActorProgramStep<S, E> extends AbstractStep<E, E> {
if (this.first) {
this.first = false;
try {
- final GraphActors<TraverserSet<E>> graphActors = this.actorsClass.getConstructor(ActorProgram.class, Partitioner.class).
- newInstance(new TraversalActorProgram<E>(this.actorsTraversal, this.partitioner), this.partitioner);
- graphActors.submit().get().forEach(this.starts::add);
+ final GraphActors<TraverserSet<E>> graphActors = this.actorsClass.newInstance();
+ final ActorProgram<TraverserSet<E>> actorProgram = new TraversalActorProgram<>(this.actorsTraversal);
+ graphActors.partitioner(this.partitioner).program(actorProgram).submit().get().forEach(this.starts::add);
} catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}