You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/23 21:24:00 UTC

[04/50] [abbrv] tinkerpop git commit: more alignment between GraphActors and GraphComputer. GraphActors.partitioner().program().submit(). Unlike GraphComputer, there is no Graph.actors(). HOWEVER -- since GraphActors is Partition-centric, perhaps we do G

more alignment between GraphActors and GraphComputer. GraphActors.partitioner().program().submit(). Unlike GraphComputer, there is no Graph.actors(). HOWEVER -- since GraphActors is Partition-centric, perhaps we do Graph.partitioner().actors(). eek. hehe..


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/9477d530
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/9477d530
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/9477d530

Branch: refs/heads/TINKERPOP-1564
Commit: 9477d530bb494816ad88b6c6620137ae749aeea6
Parents: e8e9ca4
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Dec 15 11:59:48 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Jan 23 14:22:53 2017 -0700

----------------------------------------------------------------------
 .../akka/process/actor/AkkaGraphActors.java     | 48 +++++++++++++-------
 .../gremlin/akka/process/actor/MasterActor.java |  7 +++
 .../gremlin/akka/process/actor/WorkerActor.java |  7 +++
 .../tinkerpop/gremlin/process/actor/Actor.java  | 46 ++++++++++++++++---
 .../gremlin/process/actor/GraphActors.java      |  7 ++-
 .../actor/traversal/TraversalActorProgram.java  | 10 ++--
 .../actor/traversal/TraversalMasterProgram.java | 14 ++----
 .../actor/traversal/TraversalWorkerProgram.java | 12 ++---
 .../step/map/TraversalActorProgramStep.java     |  6 +--
 9 files changed, 105 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9477d530/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
index 51747ac..5739369 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
@@ -44,20 +44,12 @@ import java.util.stream.Collectors;
  */
 public final class AkkaGraphActors<R> implements GraphActors<R> {
 
-    private final ActorSystem system;
-    private final Address.Master master;
-    private final ActorsResult<R> result = new DefaultActorsResult<>();
+    private ActorProgram<R> actorProgram;
+    private Partitioner partitioner;
+    private boolean executed = false;
+
+    public AkkaGraphActors() {
 
-    public AkkaGraphActors(final ActorProgram<R> actorProgram, final Partitioner partitioner) {
-        final Config config = ConfigFactory.defaultApplication().
-                withValue("message-priorities",
-                        ConfigValueFactory.fromAnyRef(actorProgram.getMessagePriorities().get().stream().map(Class::getCanonicalName).collect(Collectors.toList()).toString()));
-        this.system = ActorSystem.create("traversal-" + UUID.randomUUID(), config);
-        try {
-            this.master = new Address.Master(this.system.actorOf(Props.create(MasterActor.class, actorProgram, partitioner, result), "master").path().toString(), InetAddress.getLocalHost());
-        } catch (final UnknownHostException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
     }
 
     @Override
@@ -66,17 +58,39 @@ public final class AkkaGraphActors<R> implements GraphActors<R> {
     }
 
     @Override
-    public Address.Master master() {
-        return this.master;
+    public GraphActors<R> program(final ActorProgram<R> actorProgram) {
+        this.actorProgram = actorProgram;
+        return this;
+    }
+
+    @Override
+    public GraphActors<R> partitioner(final Partitioner partitioner) {
+        this.partitioner = partitioner;
+        return this;
     }
 
     @Override
     public Future<R> submit() {
+        if (this.executed)
+            throw new IllegalStateException("Can not execute twice");
+        this.executed = true;
+        final ActorSystem system;
+        final ActorsResult<R> result = new DefaultActorsResult<>();
+
+        final Config config = ConfigFactory.defaultApplication().
+                withValue("message-priorities",
+                        ConfigValueFactory.fromAnyRef(actorProgram.getMessagePriorities().get().stream().map(Class::getCanonicalName).collect(Collectors.toList()).toString()));
+        system = ActorSystem.create("traversal-" + UUID.randomUUID(), config);
+        try {
+            new Address.Master(system.actorOf(Props.create(MasterActor.class, actorProgram, partitioner, result), "master").path().toString(), InetAddress.getLocalHost());
+        } catch (final UnknownHostException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
         return CompletableFuture.supplyAsync(() -> {
-            while (!this.system.isTerminated()) {
+            while (!system.isTerminated()) {
 
             }
-            return this.result.getResult();
+            return result.getResult();
         });
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9477d530/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
index 11069f2..a4ef639 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
@@ -48,8 +48,10 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
     private final List<Address.Worker> workers;
     private final Map<Address, ActorSelection> actors = new HashMap<>();
     private final ActorsResult<?> result;
+    private final Partitioner partitioner;
 
     public MasterActor(final ActorProgram program, final Partitioner partitioner, final ActorsResult<?> result) {
+        this.partitioner = partitioner;
         this.result = result;
         try {
             this.master = new Address.Master(self().path().toString(), InetAddress.getLocalHost());
@@ -93,6 +95,11 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
     }
 
     @Override
+    public Partitioner partitioner() {
+        return this.partitioner;
+    }
+
+    @Override
     public Address.Master address() {
         return this.master;
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9477d530/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
index e043c20..35b5a4f 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
@@ -41,6 +41,7 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ
 
     private final ActorProgram.Worker workerProgram;
     private final Partition localPartition;
+    private final Partitioner partitioner;
     private final Address.Worker self;
     private final Address.Master master;
     private final List<Address.Worker> workers;
@@ -48,6 +49,7 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ
 
     public WorkerActor(final ActorProgram program, final Address.Master master, final Partition localPartition, final Partitioner partitioner) {
         this.localPartition = localPartition;
+        this.partitioner = partitioner;
         this.self = new Address.Worker(this.createWorkerAddress(localPartition), localPartition.location());
         this.master = master;
         this.workers = new ArrayList<>();
@@ -89,6 +91,11 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ
     }
 
     @Override
+    public Partitioner partitioner() {
+        return this.partitioner;
+    }
+
+    @Override
     public Address.Worker address() {
         return this.self;
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9477d530/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
index e2f596e..5a0b869 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
@@ -20,23 +20,53 @@
 package org.apache.tinkerpop.gremlin.process.actor;
 
 import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
 
 import java.util.List;
 
 /**
+ * An Actor represents an isolated processing unit that can only be interacted with via messages.
+ * Actors are able to send and receive messages. The {@link GraphActors} framework has two types of actors:
+ * {@link Master} and {@link Worker}. A master actor is not associated with a particular graph {@link Partition}.
+ * Instead, its role is to coordinate the workers and ultimately, yield the final result of the submitted
+ * {@link ActorProgram}.
+ *
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public interface Actor {
 
+    /**
+     * Get the {@link Partitioner} associated with the {@link GraphActors} system.
+     *
+     * @return the partitioner used to partition (logically and/or physically) the {@link org.apache.tinkerpop.gremlin.structure.Graph}
+     */
+    public Partitioner partitioner();
+
+    /**
+     * Get the {@link Address} of the actor.
+     *
+     * @return the actor's address
+     */
     public Address address();
 
+    /**
+     * Get a list of the {@link Address} values of all the workers in {@link GraphActors} system.
+     *
+     * @return the worker's addresses
+     */
+    public List<Address.Worker> workers();
+
+    /**
+     * Send a message from this actor to another actor given their {@link Address}.
+     *
+     * @param toActor the actor to receive the messages
+     * @param message the message being sent
+     * @param <M>     the message type
+     */
     public <M> void send(final Address toActor, final M message);
 
-
     public interface Master extends Actor {
 
-        public List<Address.Worker> workers();
-
         public Address.Master address();
 
         public void close();
@@ -51,10 +81,14 @@ public interface Actor {
 
         public Address.Master master();
 
-        public List<Address.Worker> workers();
-
+        /**
+         * Get the {@link Partition} associated with this worker.
+         * In principle, this is the subset of the {@link org.apache.tinkerpop.gremlin.structure.Graph} that
+         * the worker is "data-local" to.
+         *
+         * @return the worker's partition
+         */
         public Partition partition();
-
     }
 
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9477d530/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java
index d018397..c19dbf7 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java
@@ -20,18 +20,21 @@
 package org.apache.tinkerpop.gremlin.process.actor;
 
 import org.apache.tinkerpop.gremlin.process.Processor;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
 
 import java.util.concurrent.Future;
 
 /**
  * GraphActors is a message-passing based graph {@link Processor} that is:
- * asynchronous, distributed, partition-bound, and traverser-centric.
+ * asynchronous, distributed, and partition centric.
  *
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public interface GraphActors<R> extends Processor {
 
-    public Address.Master master();
+    public GraphActors<R> program(final ActorProgram<R> program);
+
+    public GraphActors<R> partitioner(final Partitioner partitioner);
 
     public Future<R> submit();
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9477d530/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
index 6bfdff7..b584322 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.process.actor.traversal;
 
 import org.apache.tinkerpop.gremlin.process.actor.Actor;
 import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
-import org.apache.tinkerpop.gremlin.process.actor.ActorsResult;
 import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage;
 import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage;
 import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage;
@@ -41,7 +40,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.Path
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.RepeatUnrollStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -63,11 +61,9 @@ public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet
             Terminate.class);
 
     private Traversal.Admin<?, R> traversal;
-    private final Partitioner partitioner;
     public TraverserSet<R> result = new TraverserSet<>();
 
-    public TraversalActorProgram(final Traversal.Admin<?, R> traversal, final Partitioner partitioner) {
-        this.partitioner = partitioner;
+    public TraversalActorProgram(final Traversal.Admin<?, R> traversal) {
         this.traversal = traversal;
         final TraversalStrategies strategies = this.traversal.getStrategies().clone();
         strategies.addStrategies(ActorVerificationStrategy.instance(), ReadOnlyStrategy.instance());
@@ -89,12 +85,12 @@ public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet
 
     @Override
     public Worker createWorkerProgram(final Actor.Worker worker) {
-        return new TraversalWorkerProgram<>(worker, this.traversal.clone(), this.partitioner);
+        return new TraversalWorkerProgram<>(worker, this.traversal.clone());
     }
 
     @Override
     public Master createMasterProgram(final Actor.Master master) {
-        return new TraversalMasterProgram<>(master, this.traversal.clone(), this.partitioner, this.result);
+        return new TraversalMasterProgram<>(master, this.traversal.clone(), this.result);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9477d530/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
index e15106f..2aaf686 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
@@ -44,7 +44,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSe
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Partition;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -54,23 +53,20 @@ import java.util.Map;
  */
 final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
 
-
     private final Actor.Master master;
     private final Traversal.Admin<?, ?> traversal;
     private final TraversalMatrix<?, ?> matrix;
-    private final Partitioner partitioner;
     private Map<String, Barrier> barriers = new HashMap<>();
     private final TraverserSet<?> results;
     private Address.Worker leaderWorker;
     private int orderCounter = -1;
-    private final Map<Partition,Address.Worker> partitionToWorkerMap = new HashMap<>();
+    private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>();
 
-    public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final Partitioner partitioner, final TraverserSet<?> results) {
+    public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final TraverserSet<?> results) {
         this.traversal = traversal;
         // System.out.println("master[created]: " + master.address().getId());
         // System.out.println(this.traversal);
         this.matrix = new TraversalMatrix<>(this.traversal);
-        this.partitioner = partitioner;
         this.results = results;
         this.master = master;
         Distributing.configure(this.traversal, true, true);
@@ -80,8 +76,8 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
     @Override
     public void setup() {
         this.leaderWorker = this.master.workers().get(0);
-        for(int i=0; i<this.partitioner.getPartitions().size(); i++) {
-            this.partitionToWorkerMap.put(this.partitioner.getPartitions().get(i),this.master.workers().get(i));
+        for (int i = 0; i < this.master.partitioner().getPartitions().size(); i++) {
+            this.partitionToWorkerMap.put(this.master.partitioner().getPartitions().get(i), this.master.workers().get(i));
         }
         this.broadcast(StartMessage.instance());
         this.master.send(this.leaderWorker, Terminate.MAYBE);
@@ -167,7 +163,7 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
         if (traverser.isHalted())
             this.results.add(traverser);
         else if (traverser.get() instanceof Element)
-            this.master.send(this.partitionToWorkerMap.get(this.partitioner.getPartition((Element) traverser.get())), traverser);
+            this.master.send(this.partitionToWorkerMap.get(this.master.partitioner().getPartition((Element) traverser.get())), traverser);
         else
             this.master.send(this.master.address(), traverser);
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9477d530/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
index ef80f03..001219a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
@@ -31,7 +31,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Bypassing;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
@@ -40,7 +39,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Element;
 import org.apache.tinkerpop.gremlin.structure.Partition;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
@@ -54,7 +52,6 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
 
     private final Actor.Worker self;
     private final TraversalMatrix<?, ?> matrix;
-    private final Partitioner partitioner;
     private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>();
     //
     private Address.Worker neighborWorker;
@@ -63,11 +60,10 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
     private boolean voteToHalt = false;
     private Map<String, Barrier> barriers = new HashMap<>();
 
-    public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
+    public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal) {
         this.self = self;
         // System.out.println("worker[created]: " + this.self.address().getId());
         // set up partition and traversal information
-        this.partitioner = partitioner;
         final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self);
         TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal);
         this.matrix = new TraversalMatrix<>(traversal);
@@ -93,8 +89,8 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
         final int i = this.self.workers().indexOf(this.self.address());
         this.neighborWorker = this.self.workers().get(i == this.self.workers().size() - 1 ? 0 : i + 1);
         this.isLeader = i == 0;
-        for (int j = 0; j < this.partitioner.getPartitions().size(); j++) {
-            this.partitionToWorkerMap.put(this.partitioner.getPartitions().get(j), this.self.workers().get(j));
+        for (int j = 0; j < this.self.partitioner().getPartitions().size(); j++) {
+            this.partitionToWorkerMap.put(this.self.partitioner().getPartitions().get(j), this.self.workers().get(j));
         }
     }
 
@@ -167,7 +163,7 @@ final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
         if (traverser.isHalted())
             this.self.send(this.self.master(), traverser);
         else if (traverser.get() instanceof Element && !this.self.partition().contains((Element) traverser.get()))
-            this.self.send(this.partitionToWorkerMap.get(this.partitioner.getPartition((Element) traverser.get())), traverser);
+            this.self.send(this.partitionToWorkerMap.get(this.self.partitioner().getPartition((Element) traverser.get())), traverser);
         else
             this.self.send(this.self.address(), traverser);
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/9477d530/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java
index ba2a08e..5d643af 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java
@@ -60,9 +60,9 @@ public final class TraversalActorProgramStep<S, E> extends AbstractStep<E, E> {
         if (this.first) {
             this.first = false;
             try {
-                final GraphActors<TraverserSet<E>> graphActors = this.actorsClass.getConstructor(ActorProgram.class, Partitioner.class).
-                        newInstance(new TraversalActorProgram<E>(this.actorsTraversal, this.partitioner), this.partitioner);
-                graphActors.submit().get().forEach(this.starts::add);
+                final GraphActors<TraverserSet<E>> graphActors = this.actorsClass.newInstance();
+                final ActorProgram<TraverserSet<E>> actorProgram = new TraversalActorProgram<>(this.actorsTraversal);
+                graphActors.partitioner(this.partitioner).program(actorProgram).submit().get().forEach(this.starts::add);
             } catch (final Exception e) {
                 throw new IllegalStateException(e.getMessage(), e);
             }