You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/13 18:56:44 UTC
[28/50] [abbrv] tinkerpop git commit: more organization and cleaning
and learning.... next up Host and Proxy.
more organization and cleaning and learning.... next up Host and Proxy.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/ca0acf70
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/ca0acf70
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/ca0acf70
Branch: refs/heads/TINKERPOP-1564
Commit: ca0acf70d8ebcea04665c171d86d77ac0b449e91
Parents: 5d1f438
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Jan 12 11:13:48 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Jan 13 11:55:55 2017 -0700
----------------------------------------------------------------------
.../akka/process/actors/AkkaConfigFactory.java | 2 +-
.../akka/process/actors/AkkaGraphActors.java | 25 ++++++--------------
.../akka/process/actors/MasterActor.java | 16 +++++++------
.../akka/process/actors/WorkerActor.java | 8 ++++---
.../src/main/resources/application.conf | 4 ++--
.../util/partitioner/GlobalPartitioner.java | 2 --
6 files changed, 24 insertions(+), 33 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ca0acf70/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
index 7a7c958..ad6a3d6 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
@@ -72,6 +72,6 @@ final class AkkaConfigFactory {
static Address getWorkerActorDeployment(final Partition partition) {
final String location = partition.location().isSiteLocalAddress() ? "127.0.0.1" : partition.location().getHostAddress().toString();
- return AddressFromURIString.parse("akka.tcp://traversal@" + location + ":2552");
+ return AddressFromURIString.parse("akka.tcp://tinkerpop@" + location + ":2552");
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ca0acf70/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
index 0e80924..d967aed 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
@@ -28,7 +28,6 @@ import org.apache.commons.configuration.Configuration;
import org.apache.commons.configuration.ConfigurationUtils;
import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
-import org.apache.tinkerpop.gremlin.process.actors.Address;
import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
import org.apache.tinkerpop.gremlin.process.actors.util.DefaultActorsResult;
import org.apache.tinkerpop.gremlin.process.actors.util.GraphActorsHelper;
@@ -36,8 +35,6 @@ import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.util.config.SerializableConfiguration;
-import java.net.InetAddress;
-import java.net.UnknownHostException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
@@ -47,7 +44,6 @@ import java.util.concurrent.Future;
public final class AkkaGraphActors<R> implements GraphActors<R> {
private ActorProgram actorProgram;
- private int workers = 1;
private Configuration configuration;
private boolean executed = false;
@@ -71,7 +67,6 @@ public final class AkkaGraphActors<R> implements GraphActors<R> {
@Override
public GraphActors<R> workers(final int workers) {
- this.workers = workers;
this.configuration.setProperty(GRAPH_ACTORS_WORKERS, workers);
return this;
}
@@ -88,20 +83,14 @@ public final class AkkaGraphActors<R> implements GraphActors<R> {
throw new IllegalStateException("Can not execute twice");
this.executed = true;
- final ActorSystem system = ActorSystem.create("traversal", AkkaConfigFactory.generateAkkaConfig(this.actorProgram));
+ final ActorSystem system = ActorSystem.create("tinkerpop", AkkaConfigFactory.generateAkkaConfig(this.actorProgram));
final ActorsResult<R> result = new DefaultActorsResult<>();
- try {
- final Configuration programConfiguration = new SerializableConfiguration(this.configuration);
- ConfigurationUtils.copy(graph.configuration(), programConfiguration);
- ///////
- final akka.actor.Address masterAddress = AkkaConfigFactory.getMasterActorDeployment();
- new Address.Master(system.actorOf(
- Props.create(MasterActor.class, programConfiguration, result).withDeploy(new Deploy(new RemoteScope(masterAddress))),
- "master").path().toString(),
- InetAddress.getByName(masterAddress.host().get()));
- } catch (final UnknownHostException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
+ final Configuration finalConfiguration = new SerializableConfiguration(this.configuration);
+ ConfigurationUtils.copy(graph.configuration(), finalConfiguration);
+ ///////
+ final akka.actor.Address masterAddress = AkkaConfigFactory.getMasterActorDeployment();
+ system.actorOf(Props.create(MasterActor.class, finalConfiguration, result).withDeploy(new Deploy(new RemoteScope(masterAddress))), "master");
+
return CompletableFuture.supplyAsync(() -> {
while (!system.isTerminated()) {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ca0acf70/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
index 3231645..4fbfd94 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.akka.process.actors;
import akka.actor.AbstractActor;
import akka.actor.ActorSelection;
-import akka.actor.AddressFromURIString;
import akka.actor.Deploy;
import akka.actor.Props;
import akka.dispatch.RequiresMessageQueue;
@@ -32,6 +31,7 @@ import org.apache.tinkerpop.gremlin.process.actors.Actor;
import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
import org.apache.tinkerpop.gremlin.process.actors.Address;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Partition;
import org.apache.tinkerpop.gremlin.structure.Partitioner;
@@ -60,7 +60,8 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
public MasterActor(final Configuration configuration, final ActorsResult<?> result) {
final Graph graph = GraphFactory.open(configuration);
final ActorProgram actorProgram = ActorProgram.createActorProgram(graph, configuration);
- this.partitioner = new HashPartitioner(graph.partitioner(), 5);
+ final int workers = configuration.getInt(GraphActors.GRAPH_ACTORS_WORKERS, 1);
+ this.partitioner = workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(), workers);
this.result = result;
try {
this.master = new Address.Master(self().path().toString(), InetAddress.getLocalHost());
@@ -70,11 +71,12 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
this.workers = new ArrayList<>();
final List<Partition> partitions = partitioner.getPartitions();
for (final Partition partition : partitions) {
- final String workerPathString = "worker-" + partition.id();
- this.workers.add(new Address.Worker(workerPathString, partition.location()));
- context().actorOf(Props.create(WorkerActor.class, configuration, this.workers.size()-1, this.master)
- .withDeploy(new Deploy(new RemoteScope(AkkaConfigFactory.getWorkerActorDeployment(partition)))),
- workerPathString);
+ final Address.Worker workerAddress = new Address.Worker("worker-" + partition.id(), partition.location());
+ this.workers.add(workerAddress);
+ context().actorOf(
+ Props.create(WorkerActor.class, configuration, partition.id(), this.master)
+ .withDeploy(new Deploy(new RemoteScope(AkkaConfigFactory.getWorkerActorDeployment(partition)))),
+ workerAddress.getId());
}
this.masterProgram = actorProgram.createMasterProgram(this);
receive(ReceiveBuilder.matchAny(this.masterProgram::execute).build());
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ca0acf70/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java
index f90f081..02c4e4c 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java
@@ -27,6 +27,7 @@ import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.process.actors.Actor;
import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
import org.apache.tinkerpop.gremlin.process.actors.Address;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Partition;
import org.apache.tinkerpop.gremlin.structure.Partitioner;
@@ -51,11 +52,12 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ
private final List<Address.Worker> workers;
private final Map<Address, ActorSelection> actors = new HashMap<>();
- public WorkerActor(final Configuration configuration, final int workerIndex, final Address.Master master) {
+ public WorkerActor(final Configuration configuration, final String partitionId, final Address.Master master) {
final Graph graph = GraphFactory.open(configuration);
final ActorProgram actorProgram = ActorProgram.createActorProgram(graph, configuration);
- this.partitioner = new HashPartitioner(graph.partitioner(), 5);
- this.localPartition = this.partitioner.getPartitions().get(workerIndex);
+ final int workers = configuration.getInt(GraphActors.GRAPH_ACTORS_WORKERS, 1);
+ this.partitioner = workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(), workers);
+ this.localPartition = this.partitioner.getPartition(partitionId);
this.self = new Address.Worker(this.createWorkerAddress(this.localPartition), this.localPartition.location());
this.master = master;
this.workers = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ca0acf70/akka-gremlin/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/resources/application.conf b/akka-gremlin/src/main/resources/application.conf
index d722ee3..6d1915f 100644
--- a/akka-gremlin/src/main/resources/application.conf
+++ b/akka-gremlin/src/main/resources/application.conf
@@ -16,8 +16,8 @@ akka {
}
cluster {
seed-nodes = [
- "akka.tcp://traversal@127.0.0.1:2552"
- "akka.tcp://traversal@127.0.0.1:2552"]
+ "akka.tcp://tinkerpop@127.0.0.1:2552"
+ "akka.tcp://tinkerpop@127.0.0.1:2552"]
auto-down-unreachable-after = 10s
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ca0acf70/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
index af04dbe..73962d3 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
@@ -33,7 +33,6 @@ import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Collections;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -78,7 +77,6 @@ public final class GlobalPartitioner implements Partitioner {
private class GlobalPartition implements Partition {
private final GlobalPartitioner partitioner;
- private final Map<String, Object> configuration = new HashMap<>();
private final String id;
private final InetAddress location;