You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/24 16:26:07 UTC

tinkerpop git commit: So I'm using the official way of getting the final result from an Akka system. Prior to this moment, I assumed the GraphActors code was executing on the same machine as the MasterActor and thus, had direct reference to the result da

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 13596ef01 -> d31535bda


So I'm using the official way of getting the final result from an Akka system. Prior to this moment, I assumed the GraphActors code was executing on the same machine as the MasterActor and thus, had direct reference to the result data in memory. Now, I use a Pattern.ask() and do the appropriate shutdown (no more depreciated terminate()) model. Need to flush out ActorsResult now.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/d31535bd
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/d31535bd
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/d31535bd

Branch: refs/heads/TINKERPOP-1564
Commit: d31535bdaab82c478b848ff7b483da63c55850d8
Parents: 13596ef
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Jan 24 09:26:02 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Jan 24 09:26:02 2017 -0700

----------------------------------------------------------------------
 .../akka/process/actors/ActorMailbox.java       | 24 +++++++++++++++-----
 .../akka/process/actors/AkkaGraphActors.java    | 16 ++++++++-----
 .../akka/process/actors/MasterActor.java        | 14 +++++++-----
 .../tinkerpop/gremlin/process/actors/Actor.java |  2 +-
 .../actors/traversal/TraversalActorProgram.java |  3 +--
 .../traversal/TraversalMasterProgram.java       | 10 +++++---
 .../step/map/TraversalActorProgramStep.java     | 10 +++++---
 .../gremlin/process/actors/GraphActorsTest.java |  2 +-
 .../actors/TestSetupTerminateActorProgram.java  |  2 +-
 9 files changed, 54 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d31535bd/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
index d45eda3..7a4bc4d 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
@@ -26,6 +26,7 @@ import akka.dispatch.MailboxType;
 import akka.dispatch.MessageQueue;
 import akka.dispatch.ProducesMessageQueue;
 import com.typesafe.config.Config;
+import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import scala.Option;
@@ -45,8 +46,11 @@ public final class ActorMailbox implements MailboxType, ProducesMessageQueue<Act
     public static class ActorMessageQueue implements MessageQueue, ActorSemantics {
         private final List<Class> messagePriorities;
         private final List<Queue> messages;
+        private ActorsResult result = null;
+        private ActorRef home = null;
         private final Object MUTEX = new Object();
 
+
         public ActorMessageQueue(final List<Class> messagePriorities) {
             this.messagePriorities = messagePriorities;
             this.messages = new ArrayList<>(this.messagePriorities.size());
@@ -63,14 +67,21 @@ public final class ActorMailbox implements MailboxType, ProducesMessageQueue<Act
         public void enqueue(final ActorRef receiver, final Envelope handle) {
             synchronized (MUTEX) {
                 final Object message = handle.message();
-                for (int i = 0; i < this.messagePriorities.size(); i++) {
-                    final Class clazz = this.messagePriorities.get(i);
-                    if (clazz.isInstance(message)) {
-                        this.messages.get(i).offer(message instanceof Traverser ? message : handle);
-                        return;
+                if (message instanceof ActorsResult) {
+                    if (receiver.equals(handle.sender()))
+                        this.result = (ActorsResult) message;
+                    else
+                        this.home = handle.sender();
+                } else {
+                    for (int i = 0; i < this.messagePriorities.size(); i++) {
+                        final Class clazz = this.messagePriorities.get(i);
+                        if (clazz.isInstance(message)) {
+                            this.messages.get(i).offer(message instanceof Traverser ? message : handle);
+                            return;
+                        }
                     }
+                    throw new IllegalArgumentException("The provided message type is not registered: " + handle.message().getClass());
                 }
-                throw new IllegalArgumentException("The provided message type is not registered: " + handle.message().getClass());
             }
         }
 
@@ -108,6 +119,7 @@ public final class ActorMailbox implements MailboxType, ProducesMessageQueue<Act
 
         public void cleanUp(final ActorRef owner, final MessageQueue deadLetters) {
             synchronized (MUTEX) {
+                this.home.tell(this.result, owner);
                 for (final Queue queue : this.messages) {
                     while (!queue.isEmpty()) {
                         final Object m = queue.poll();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d31535bd/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
index 12e6c1e..3c5caff 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
@@ -19,23 +19,24 @@
 
 package org.apache.tinkerpop.gremlin.akka.process.actors;
 
+import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Deploy;
 import akka.actor.Props;
+import akka.pattern.Patterns;
 import akka.remote.RemoteScope;
 import org.apache.commons.configuration.BaseConfiguration;
 import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationUtils;
 import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
-import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
 import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
 import org.apache.tinkerpop.gremlin.process.actors.util.DefaultActorsResult;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.util.config.SerializableConfiguration;
+import scala.compat.java8.FutureConverters;
 
 import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 
 /**
@@ -87,17 +88,20 @@ public final class AkkaGraphActors<R> implements GraphActors<R> {
         final String systemName = "tinkerpop-" + UUID.randomUUID();
         finalConfiguration.setProperty(Constants.GREMLIN_AKKA_SYSTEM_NAME, systemName);
         final ActorSystem system = ActorSystem.create(systemName, AkkaConfigFactory.generateAkkaConfig(this.actorProgram, finalConfiguration));
-        final ActorsResult<R> result = new DefaultActorsResult<>();
         ///////
-        system.actorOf(Props.create(MasterActor.class, finalConfiguration, result).
+        final ActorRef master = system.actorOf(Props.create(MasterActor.class, finalConfiguration).
                 withDeploy(new Deploy(new RemoteScope(AkkaConfigFactory.getMasterActorDeployment(finalConfiguration)))), "master");
 
-        return CompletableFuture.supplyAsync(() -> {
+
+        return (Future) FutureConverters.toJava(Patterns.ask(master, new DefaultActorsResult<>(), 10000000)).toCompletableFuture();
+
+
+        /*return CompletableFuture.supplyAsync(() -> {
             while (!system.isTerminated()) {
 
             }
             return result.getResult();
-        });
+        });*/
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d31535bd/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
index 04d0076..6b1b538 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
@@ -32,6 +32,7 @@ import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
 import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
 import org.apache.tinkerpop.gremlin.process.actors.Address;
 import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
+import org.apache.tinkerpop.gremlin.process.actors.util.DefaultActorsResult;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Partition;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
@@ -54,15 +55,14 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
     private final Address.Master master;
     private final List<Address.Worker> workers;
     private final Map<Address, ActorSelection> actors = new HashMap<>();
-    private final ActorsResult<?> result;
     private final Partitioner partitioner;
 
-    public MasterActor(final Configuration configuration, final ActorsResult<?> result) {
+    public MasterActor(final Configuration configuration) {
         final Graph graph = GraphFactory.open(configuration);
         final ActorProgram actorProgram = ActorProgram.createActorProgram(graph, configuration);
         final int workers = configuration.getInt(GraphActors.GRAPH_ACTORS_WORKERS, 1);
         this.partitioner = workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(), workers);
-        this.result = result;
+
         try {
             this.master = new Address.Master(self().path().toString(), InetAddress.getLocalHost());
         } catch (final UnknownHostException e) {
@@ -120,12 +120,14 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
     @Override
     public void close() {
         this.context().system().stop(self());
-        context().system().terminate();
+        this.context().system().terminate();
     }
 
     @Override
-    public <R> ActorsResult<R> result() {
-        return (ActorsResult<R>) this.result;
+    public <R> void setResult(final R object) {
+       final ActorsResult<R> result = new DefaultActorsResult<>();
+        result.setResult(object);
+        self().tell(result,self());
     }
 
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d31535bd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
index 5930be3..d48c4d6 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
@@ -81,7 +81,7 @@ public interface Actor {
          */
         public Partitioner partitioner();
 
-        public <R> ActorsResult<R> result();
+        public <R> void setResult(final R result);
 
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d31535bd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
index 2921e15..96afea8 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
@@ -72,7 +72,6 @@ public final class TraversalActorProgram<R> implements ActorProgram {
             Terminate.class);
 
     private Traversal.Admin<?, R> traversal;
-    public TraverserSet<R> result = new TraverserSet<>();
 
     public TraversalActorProgram() {
 
@@ -133,7 +132,7 @@ public final class TraversalActorProgram<R> implements ActorProgram {
 
     @Override
     public TraversalActorProgram.Master createMasterProgram(final Actor.Master master) {
-        return new TraversalMasterProgram(master, this.traversal.clone(), this.result);
+        return new TraversalMasterProgram(master, this.traversal.clone());
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d31535bd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
index 5fe17ae..6ba0cd0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
@@ -68,12 +68,12 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> {
     private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>();
     private boolean voteToHalt = true;
 
-    public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final TraverserSet<?> results) {
+    public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal) {
         this.traversal = traversal;
         // System.out.println("master[created]: " + master.address().getId());
         // System.out.println(this.traversal);
         this.matrix = new TraversalMatrix<>(this.traversal);
-        this.results = results;
+        this.results = new TraverserSet<>();
         this.master = master;
         Distributing.configure(this.traversal, true, true);
         Pushing.configure(this.traversal, true, false);
@@ -163,7 +163,11 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> {
 
     @Override
     public void terminate() {
-        this.master.result().setResult(Pair.with(this.results, this.traversal.getSideEffects()));
+        final Map<String, Object> sideEffects = new HashMap<>();
+        for (final String key : this.traversal.getSideEffects().keys()) {
+            sideEffects.put(key, this.traversal.getSideEffects().get(key));
+        }
+        this.master.setResult(Pair.with(this.results, sideEffects));
     }
 
     private void broadcast(final Object message) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d31535bd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java
index 41d8200..50cf554 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.actors.traversal.step.map;
 
 import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
 import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
 import org.apache.tinkerpop.gremlin.process.actors.traversal.TraversalActorProgram;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -32,6 +33,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSe
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.javatuples.Pair;
 
+import java.util.Map;
 import java.util.NoSuchElementException;
 
 /**
@@ -56,11 +58,13 @@ public final class TraversalActorProgramStep<S, E> extends AbstractStep<E, E> {
         if (this.first) {
             this.first = false;
             try {
-                final GraphActors<Pair<TraverserSet<E>, TraversalSideEffects>> graphActors = GraphActors.open(this.graphActorsConfiguration);
+                final GraphActors graphActors = GraphActors.open(this.graphActorsConfiguration);
                 final ActorProgram actorProgram = new TraversalActorProgram<>(this.actorsTraversal);
-                final Pair<TraverserSet<E>, TraversalSideEffects> pair = graphActors.program(actorProgram).submit(this.getTraversal().getGraph().get()).get();
+                final Pair<TraverserSet<E>, Map<String,Object>> pair = (Pair)((ActorsResult)graphActors.program(actorProgram).submit(this.getTraversal().getGraph().get()).get()).getResult();
                 pair.getValue0().forEach(this.starts::add);
-                this.getTraversal().setSideEffects(pair.getValue1());
+                for(final Map.Entry<String,Object> entry : pair.getValue1().entrySet()) {
+                    this.getTraversal().getSideEffects().set(entry.getKey(),entry.getValue());
+                }
             } catch (final Exception e) {
                 throw new IllegalStateException(e.getMessage(), e);
             }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d31535bd/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
index 057b0c7..6018e64 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
@@ -78,7 +78,7 @@ public class GraphActorsTest extends AbstractGremlinProcessTest {
 
         for (int i = 1; i < 10; i++) {
             final GraphActors actors = graphProvider.getGraphActors(graph);
-            final List<Integer> counts = (List) actors.workers(i).program(new TestSetupTerminateActorProgram()).submit(graph).get();
+            final List<Integer> counts = (List)((ActorsResult) actors.workers(i).program(new TestSetupTerminateActorProgram()).submit(graph).get()).getResult();
             assertEquals(i, counts.get(0).intValue());
             assertEquals(i, counts.get(1).intValue());
             assertEquals(1, counts.get(2).intValue());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d31535bd/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java
index 79a37ef..59795ab 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java
@@ -107,7 +107,7 @@ class TestSetupTerminateActorProgram implements ActorProgram {
                 assertEquals(this.workerSetup, this.workerTerminate);
                 assertEquals(1, this.masterSetup);
                 assertEquals(0, this.masterTerminate);
-                master.result().setResult(Arrays.asList(
+                master.setResult(Arrays.asList(
                         this.workerSetup,
                         this.workerTerminate,
                         this.masterSetup,