You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/13 18:56:44 UTC

[28/50] [abbrv] tinkerpop git commit: more organization and cleaning and learning.... next up Host and Proxy.

more organization and cleaning and learning.... next up Host and Proxy.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/ca0acf70
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/ca0acf70
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/ca0acf70

Branch: refs/heads/TINKERPOP-1564
Commit: ca0acf70d8ebcea04665c171d86d77ac0b449e91
Parents: 5d1f438
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Jan 12 11:13:48 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Jan 13 11:55:55 2017 -0700

----------------------------------------------------------------------
 .../akka/process/actors/AkkaConfigFactory.java  |  2 +-
 .../akka/process/actors/AkkaGraphActors.java    | 25 ++++++--------------
 .../akka/process/actors/MasterActor.java        | 16 +++++++------
 .../akka/process/actors/WorkerActor.java        |  8 ++++---
 .../src/main/resources/application.conf         |  4 ++--
 .../util/partitioner/GlobalPartitioner.java     |  2 --
 6 files changed, 24 insertions(+), 33 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ca0acf70/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
index 7a7c958..ad6a3d6 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaConfigFactory.java
@@ -72,6 +72,6 @@ final class AkkaConfigFactory {
 
     static Address getWorkerActorDeployment(final Partition partition) {
         final String location = partition.location().isSiteLocalAddress() ? "127.0.0.1" : partition.location().getHostAddress().toString();
-        return AddressFromURIString.parse("akka.tcp://traversal@" + location + ":2552");
+        return AddressFromURIString.parse("akka.tcp://tinkerpop@" + location + ":2552");
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ca0acf70/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
index 0e80924..d967aed 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
@@ -28,7 +28,6 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.commons.configuration.ConfigurationUtils;
 import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
 import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
-import org.apache.tinkerpop.gremlin.process.actors.Address;
 import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
 import org.apache.tinkerpop.gremlin.process.actors.util.DefaultActorsResult;
 import org.apache.tinkerpop.gremlin.process.actors.util.GraphActorsHelper;
@@ -36,8 +35,6 @@ import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.util.config.SerializableConfiguration;
 
-import java.net.InetAddress;
-import java.net.UnknownHostException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 
@@ -47,7 +44,6 @@ import java.util.concurrent.Future;
 public final class AkkaGraphActors<R> implements GraphActors<R> {
 
     private ActorProgram actorProgram;
-    private int workers = 1;
     private Configuration configuration;
     private boolean executed = false;
 
@@ -71,7 +67,6 @@ public final class AkkaGraphActors<R> implements GraphActors<R> {
 
     @Override
     public GraphActors<R> workers(final int workers) {
-        this.workers = workers;
         this.configuration.setProperty(GRAPH_ACTORS_WORKERS, workers);
         return this;
     }
@@ -88,20 +83,14 @@ public final class AkkaGraphActors<R> implements GraphActors<R> {
             throw new IllegalStateException("Can not execute twice");
         this.executed = true;
 
-        final ActorSystem system = ActorSystem.create("traversal", AkkaConfigFactory.generateAkkaConfig(this.actorProgram));
+        final ActorSystem system = ActorSystem.create("tinkerpop", AkkaConfigFactory.generateAkkaConfig(this.actorProgram));
         final ActorsResult<R> result = new DefaultActorsResult<>();
-        try {
-            final Configuration programConfiguration = new SerializableConfiguration(this.configuration);
-            ConfigurationUtils.copy(graph.configuration(), programConfiguration);
-            ///////
-            final akka.actor.Address masterAddress = AkkaConfigFactory.getMasterActorDeployment();
-            new Address.Master(system.actorOf(
-                    Props.create(MasterActor.class, programConfiguration, result).withDeploy(new Deploy(new RemoteScope(masterAddress))),
-                    "master").path().toString(),
-                    InetAddress.getByName(masterAddress.host().get()));
-        } catch (final UnknownHostException e) {
-            throw new IllegalStateException(e.getMessage(), e);
-        }
+        final Configuration finalConfiguration = new SerializableConfiguration(this.configuration);
+        ConfigurationUtils.copy(graph.configuration(), finalConfiguration);
+        ///////
+        final akka.actor.Address masterAddress = AkkaConfigFactory.getMasterActorDeployment();
+        system.actorOf(Props.create(MasterActor.class, finalConfiguration, result).withDeploy(new Deploy(new RemoteScope(masterAddress))), "master");
+
         return CompletableFuture.supplyAsync(() -> {
             while (!system.isTerminated()) {
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ca0acf70/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
index 3231645..4fbfd94 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.akka.process.actors;
 
 import akka.actor.AbstractActor;
 import akka.actor.ActorSelection;
-import akka.actor.AddressFromURIString;
 import akka.actor.Deploy;
 import akka.actor.Props;
 import akka.dispatch.RequiresMessageQueue;
@@ -32,6 +31,7 @@ import org.apache.tinkerpop.gremlin.process.actors.Actor;
 import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
 import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
 import org.apache.tinkerpop.gremlin.process.actors.Address;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Partition;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
@@ -60,7 +60,8 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
     public MasterActor(final Configuration configuration, final ActorsResult<?> result) {
         final Graph graph = GraphFactory.open(configuration);
         final ActorProgram actorProgram = ActorProgram.createActorProgram(graph, configuration);
-        this.partitioner = new HashPartitioner(graph.partitioner(), 5);
+        final int workers = configuration.getInt(GraphActors.GRAPH_ACTORS_WORKERS, 1);
+        this.partitioner = workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(), workers);
         this.result = result;
         try {
             this.master = new Address.Master(self().path().toString(), InetAddress.getLocalHost());
@@ -70,11 +71,12 @@ public final class MasterActor extends AbstractActor implements RequiresMessageQ
         this.workers = new ArrayList<>();
         final List<Partition> partitions = partitioner.getPartitions();
         for (final Partition partition : partitions) {
-            final String workerPathString = "worker-" + partition.id();
-            this.workers.add(new Address.Worker(workerPathString, partition.location()));
-            context().actorOf(Props.create(WorkerActor.class, configuration, this.workers.size()-1, this.master)
-                    .withDeploy(new Deploy(new RemoteScope(AkkaConfigFactory.getWorkerActorDeployment(partition)))),
-                    workerPathString);
+            final Address.Worker workerAddress = new Address.Worker("worker-" + partition.id(), partition.location());
+            this.workers.add(workerAddress);
+            context().actorOf(
+                    Props.create(WorkerActor.class, configuration, partition.id(), this.master)
+                            .withDeploy(new Deploy(new RemoteScope(AkkaConfigFactory.getWorkerActorDeployment(partition)))),
+                    workerAddress.getId());
         }
         this.masterProgram = actorProgram.createMasterProgram(this);
         receive(ReceiveBuilder.matchAny(this.masterProgram::execute).build());

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ca0acf70/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java
index f90f081..02c4e4c 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java
@@ -27,6 +27,7 @@ import org.apache.commons.configuration.Configuration;
 import org.apache.tinkerpop.gremlin.process.actors.Actor;
 import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
 import org.apache.tinkerpop.gremlin.process.actors.Address;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Partition;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
@@ -51,11 +52,12 @@ public final class WorkerActor extends AbstractActor implements RequiresMessageQ
     private final List<Address.Worker> workers;
     private final Map<Address, ActorSelection> actors = new HashMap<>();
 
-    public WorkerActor(final Configuration configuration, final int workerIndex, final Address.Master master) {
+    public WorkerActor(final Configuration configuration, final String partitionId, final Address.Master master) {
         final Graph graph = GraphFactory.open(configuration);
         final ActorProgram actorProgram = ActorProgram.createActorProgram(graph, configuration);
-        this.partitioner = new HashPartitioner(graph.partitioner(), 5);
-        this.localPartition = this.partitioner.getPartitions().get(workerIndex);
+        final int workers = configuration.getInt(GraphActors.GRAPH_ACTORS_WORKERS, 1);
+        this.partitioner = workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(), workers);
+        this.localPartition = this.partitioner.getPartition(partitionId);
         this.self = new Address.Worker(this.createWorkerAddress(this.localPartition), this.localPartition.location());
         this.master = master;
         this.workers = new ArrayList<>();

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ca0acf70/akka-gremlin/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/resources/application.conf b/akka-gremlin/src/main/resources/application.conf
index d722ee3..6d1915f 100644
--- a/akka-gremlin/src/main/resources/application.conf
+++ b/akka-gremlin/src/main/resources/application.conf
@@ -16,8 +16,8 @@ akka {
   }
   cluster {
     seed-nodes = [
-      "akka.tcp://traversal@127.0.0.1:2552"
-      "akka.tcp://traversal@127.0.0.1:2552"]
+      "akka.tcp://tinkerpop@127.0.0.1:2552"
+      "akka.tcp://tinkerpop@127.0.0.1:2552"]
     auto-down-unreachable-after = 10s
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/ca0acf70/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
index af04dbe..73962d3 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/partitioner/GlobalPartitioner.java
@@ -33,7 +33,6 @@ import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import java.net.InetAddress;
 import java.net.UnknownHostException;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -78,7 +77,6 @@ public final class GlobalPartitioner implements Partitioner {
     private class GlobalPartition implements Partition {
 
         private final GlobalPartitioner partitioner;
-        private final Map<String, Object> configuration = new HashMap<>();
         private final String id;
         private final InetAddress location;