You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/24 16:26:07 UTC
tinkerpop git commit: So I'm using the official way of getting the
final result from an Akka system. Prior to this moment,
I assumed the GraphActors code was executing on the same machine as the
MasterActor and thus, had direct reference to the result da
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 13596ef01 -> d31535bda
So I'm using the official way of getting the final result from an Akka system. Prior to this moment, I assumed the GraphActors code was executing on the same machine as the MasterActor and thus, had direct reference to the result data in memory. Now, I use a Pattern.ask() and do the appropriate shutdown (no more depreciated terminate()) model. Need to flush out ActorsResult now.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/d31535bd
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/d31535bd
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/d31535bd
Branch: refs/heads/TINKERPOP-1564
Commit: d31535bdaab82c478b848ff7b483da63c55850d8
Parents: 13596ef
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Jan 24 09:26:02 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Jan 24 09:26:02 2017 -0700
----------------------------------------------------------------------
.../akka/process/actors/ActorMailbox.java | 24 +++++++++++++++-----
.../akka/process/actors/AkkaGraphActors.java | 16 ++++++++-----
.../akka/process/actors/MasterActor.java | 14 +++++++-----
.../tinkerpop/gremlin/process/actors/Actor.java | 2 +-
.../actors/traversal/TraversalActorProgram.java | 3 +--
.../traversal/TraversalMasterProgram.java | 10 +++++---
.../step/map/TraversalActorProgramStep.java | 10 +++++---
.../gremlin/process/actors/GraphActorsTest.java | 2 +-
.../actors/TestSetupTerminateActorProgram.java | 2 +-
9 files changed, 54 insertions(+), 29 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d31535bd/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
index d45eda3..7a4bc4d 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
@@ -26,6 +26,7 @@ import akka.dispatch.MailboxType;
import akka.dispatch.MessageQueue;
import akka.dispatch.ProducesMessageQueue;
import com.typesafe.config.Config;
+import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import scala.Option;
@@ -45,8 +46,11 @@ public final class ActorMailbox implements MailboxType, ProducesMessageQueue<Act
public static class ActorMessageQueue implements MessageQueue, ActorSemantics {
private final List<Class> messagePriorities;
private final List<Queue> messages;
+ private ActorsResult result = null;
+ private ActorRef home = null;
private final Object MUTEX = new Object();
+
public ActorMessageQueue(final List<Class> messagePriorities) {
this.messagePriorities = messagePriorities;
this.messages = new ArrayList<>(this.messagePriorities.size());
@@ -63,14 +67,21 @@ public final class ActorMailbox implements MailboxType, ProducesMessageQueue<Act
public void enqueue(final ActorRef receiver, final Envelope handle) {
synchronized (MUTEX) {
final Object message = handle.message();
- for (int i = 0; i < this.messagePriorities.size(); i++) {
- final Class clazz = this.messagePriorities.get(i);
- if (clazz.isInstance(message)) {
- this.messages.get(i).offer(message instanceof Traverser ? message : handle);
- return;
+ if (message instanceof ActorsResult) {
+ if (receiver.equals(handle.sender()))
+ this.result = (ActorsResult) message;
+ else
+ this.home = handle.sender();
+ } else {
+ for (int i = 0; i < this.messagePriorities.size(); i++) {
+ final Class clazz = this.messagePriorities.get(i);
+ if (clazz.isInstance(message)) {
+ this.messages.get(i).offer(message instanceof Traverser ? message : handle);
+ return;
+ }
}
+ throw new IllegalArgumentException("The provided message type is not registered: " + handle.message().getClass());
}
- throw new IllegalArgumentException("The provided message type is not registered: " + handle.message().getClass());
}
}
@@ -108,6 +119,7 @@ public final class ActorMailbox implements MailboxType, ProducesMessageQueue<Act
public void cleanUp(final ActorRef owner, final MessageQueue deadLetters) {
synchronized (MUTEX) {
+ this.home.tell(this.result, owner);
for (final Queue queue : this.messages) {
while (!queue.isEmpty()) {
final Object m = queue.poll();
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d31535bd/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
index 12e6c1e..3c5caff 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
@@ -19,23 +19,24 @@
package org.apache.tinkerpop.gremlin.akka.process.actors;
+import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Deploy;
import akka.actor.Props;
+import akka.pattern.Patterns;
import akka.remote.RemoteScope;
import org.apache.commons.configuration.BaseConfiguration;
import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
-import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
import org.apache.tinkerpop.gremlin.process.actors.util.DefaultActorsResult;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.util.config.SerializableConfiguration;
+import scala.compat.java8.FutureConverters;
import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
/**
@@ -87,17 +88,20 @@ public final class AkkaGraphActors<R> implements GraphActors<R> {
final String systemName = "tinkerpop-" + UUID.randomUUID();
finalConfiguration.setProperty(Constants.GREMLIN_AKKA_SYSTEM_NAME, systemName);
final ActorSystem system = ActorSystem.create(systemName, AkkaConfigFactory.generateAkkaConfig(this.actorProgram, finalConfiguration));
- final ActorsResult<R> result = new DefaultActorsResult<>();
///////
- system.actorOf(Props.create(MasterActor.class, finalConfiguration, result).
+ final ActorRef master = system.actorOf(Props.create(MasterActor.class, finalConfiguration).
withDeploy(new Deploy(new RemoteScope(AkkaConfigFactory.getMasterActorDeployment(finalConfiguration)))), "master");
- return CompletableFuture.supplyAsync(() -> {
+
+ return (Future) FutureConverters.toJava(Patterns.ask(master, new DefaultActorsResult<>(), 10000000)).toCompletableFuture();
+
+
+ /*return CompletableFuture.supplyAsync(() -> {
while (!system.isTerminated()) {
}
return result.getResult();
- });
+ });*/
}
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d31535bd/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
index 04d0076..6b1b538 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
@@ -32,6 +32,7 @@ import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
import org.apache.tinkerpop.gremlin.process.actors.Address;
import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
+import org.apache.tinkerpop.gremlin.process.actors.util.DefaultActorsResult;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Partition;
import org.apache.tinkerpop.gremlin.structure.Partitioner;
@@ -54,15 +55,14 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
private final Address.Master master;
private final List<Address.Worker> workers;
private final Map<Address, ActorSelection> actors = new HashMap<>();
- private final ActorsResult<?> result;
private final Partitioner partitioner;
- public MasterActor(final Configuration configuration, final ActorsResult<?> result) {
+ public MasterActor(final Configuration configuration) {
final Graph graph = GraphFactory.open(configuration);
final ActorProgram actorProgram = ActorProgram.createActorProgram(graph, configuration);
final int workers = configuration.getInt(GraphActors.GRAPH_ACTORS_WORKERS, 1);
this.partitioner = workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(), workers);
- this.result = result;
+
try {
this.master = new Address.Master(self().path().toString(), InetAddress.getLocalHost());
} catch (final UnknownHostException e) {
@@ -120,12 +120,14 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
@Override
public void close() {
this.context().system().stop(self());
- context().system().terminate();
+ this.context().system().terminate();
}
@Override
- public <R> ActorsResult<R> result() {
- return (ActorsResult<R>) this.result;
+ public <R> void setResult(final R object) {
+ final ActorsResult<R> result = new DefaultActorsResult<>();
+ result.setResult(object);
+ self().tell(result,self());
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d31535bd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
index 5930be3..d48c4d6 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
@@ -81,7 +81,7 @@ public interface Actor {
*/
public Partitioner partitioner();
- public <R> ActorsResult<R> result();
+ public <R> void setResult(final R result);
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d31535bd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
index 2921e15..96afea8 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
@@ -72,7 +72,6 @@ public final class TraversalActorProgram<R> implements ActorProgram {
Terminate.class);
private Traversal.Admin<?, R> traversal;
- public TraverserSet<R> result = new TraverserSet<>();
public TraversalActorProgram() {
@@ -133,7 +132,7 @@ public final class TraversalActorProgram<R> implements ActorProgram {
@Override
public TraversalActorProgram.Master createMasterProgram(final Actor.Master master) {
- return new TraversalMasterProgram(master, this.traversal.clone(), this.result);
+ return new TraversalMasterProgram(master, this.traversal.clone());
}
@Override
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d31535bd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
index 5fe17ae..6ba0cd0 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
@@ -68,12 +68,12 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> {
private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>();
private boolean voteToHalt = true;
- public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final TraverserSet<?> results) {
+ public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal) {
this.traversal = traversal;
// System.out.println("master[created]: " + master.address().getId());
// System.out.println(this.traversal);
this.matrix = new TraversalMatrix<>(this.traversal);
- this.results = results;
+ this.results = new TraverserSet<>();
this.master = master;
Distributing.configure(this.traversal, true, true);
Pushing.configure(this.traversal, true, false);
@@ -163,7 +163,11 @@ final class TraversalMasterProgram implements ActorProgram.Master<Object> {
@Override
public void terminate() {
- this.master.result().setResult(Pair.with(this.results, this.traversal.getSideEffects()));
+ final Map<String, Object> sideEffects = new HashMap<>();
+ for (final String key : this.traversal.getSideEffects().keys()) {
+ sideEffects.put(key, this.traversal.getSideEffects().get(key));
+ }
+ this.master.setResult(Pair.with(this.results, sideEffects));
}
private void broadcast(final Object message) {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d31535bd/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java
index 41d8200..50cf554 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java
@@ -21,6 +21,7 @@ package org.apache.tinkerpop.gremlin.process.actors.traversal.step.map;
import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
import org.apache.tinkerpop.gremlin.process.actors.traversal.TraversalActorProgram;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
@@ -32,6 +33,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSe
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.javatuples.Pair;
+import java.util.Map;
import java.util.NoSuchElementException;
/**
@@ -56,11 +58,13 @@ public final class TraversalActorProgramStep<S, E> extends AbstractStep<E, E> {
if (this.first) {
this.first = false;
try {
- final GraphActors<Pair<TraverserSet<E>, TraversalSideEffects>> graphActors = GraphActors.open(this.graphActorsConfiguration);
+ final GraphActors graphActors = GraphActors.open(this.graphActorsConfiguration);
final ActorProgram actorProgram = new TraversalActorProgram<>(this.actorsTraversal);
- final Pair<TraverserSet<E>, TraversalSideEffects> pair = graphActors.program(actorProgram).submit(this.getTraversal().getGraph().get()).get();
+ final Pair<TraverserSet<E>, Map<String,Object>> pair = (Pair)((ActorsResult)graphActors.program(actorProgram).submit(this.getTraversal().getGraph().get()).get()).getResult();
pair.getValue0().forEach(this.starts::add);
- this.getTraversal().setSideEffects(pair.getValue1());
+ for(final Map.Entry<String,Object> entry : pair.getValue1().entrySet()) {
+ this.getTraversal().getSideEffects().set(entry.getKey(),entry.getValue());
+ }
} catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d31535bd/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
index 057b0c7..6018e64 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
@@ -78,7 +78,7 @@ public class GraphActorsTest extends AbstractGremlinProcessTest {
for (int i = 1; i < 10; i++) {
final GraphActors actors = graphProvider.getGraphActors(graph);
- final List<Integer> counts = (List) actors.workers(i).program(new TestSetupTerminateActorProgram()).submit(graph).get();
+ final List<Integer> counts = (List)((ActorsResult) actors.workers(i).program(new TestSetupTerminateActorProgram()).submit(graph).get()).getResult();
assertEquals(i, counts.get(0).intValue());
assertEquals(i, counts.get(1).intValue());
assertEquals(1, counts.get(2).intValue());
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/d31535bd/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java
index 79a37ef..59795ab 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/TestSetupTerminateActorProgram.java
@@ -107,7 +107,7 @@ class TestSetupTerminateActorProgram implements ActorProgram {
assertEquals(this.workerSetup, this.workerTerminate);
assertEquals(1, this.masterSetup);
assertEquals(0, this.masterTerminate);
- master.result().setResult(Arrays.asList(
+ master.setResult(Arrays.asList(
this.workerSetup,
this.workerTerminate,
this.masterSetup,