You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/10 16:11:22 UTC
[14/50] [abbrv] tinkerpop git commit: okay,
all direct references to akka URIs are removed from gremlin-core. I
have one more thing to do with Message priorities. After that, clean, Javadoc,
etc. Going to take a break for a bit first.
okay, all direct references to akka URIs are removed from gremlin-core. I have one more thing to do with Message priorities. After that, clean, Javadoc, etc. Going to take a break for a bit first.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/1c342d5e
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/1c342d5e
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/1c342d5e
Branch: refs/heads/TINKERPOP-1564
Commit: 1c342d5e5315fa4fea000fa74a0cdeddb4dbbf5d
Parents: fc37e8e
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Tue Dec 13 10:18:12 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Tue Jan 10 08:22:55 2017 -0700
----------------------------------------------------------------------
.../gremlin/akka/process/actor/AkkaActors.java | 13 ++-
.../gremlin/akka/process/actor/MasterActor.java | 84 ++++++++++++++++++
.../process/actor/MasterTraversalActor.java | 84 ------------------
.../gremlin/akka/process/actor/WorkerActor.java | 91 ++++++++++++++++++++
.../process/actor/WorkerTraversalActor.java | 91 --------------------
.../tinkerpop/gremlin/process/actor/Actors.java | 6 +-
.../gremlin/process/actor/Address.java | 11 ++-
.../actor/traversal/TraversalActorProgram.java | 16 ++--
.../actor/traversal/TraversalMasterProgram.java | 19 ++--
.../actor/traversal/TraversalWorkerProgram.java | 32 +++----
.../actor/traversal/step/map/ActorStep.java | 3 +-
11 files changed, 222 insertions(+), 228 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1c342d5e/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
index db024f6..de301c1 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaActors.java
@@ -24,7 +24,6 @@ import akka.actor.Props;
import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
import org.apache.tinkerpop.gremlin.process.actor.Actors;
import org.apache.tinkerpop.gremlin.process.actor.Address;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.structure.Partitioner;
import java.util.concurrent.CompletableFuture;
@@ -33,16 +32,16 @@ import java.util.concurrent.Future;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class AkkaActors<S, E> implements Actors<S, E> {
+public final class AkkaActors<R> implements Actors<R> {
- private final ActorProgram actorProgram;
+ private final ActorProgram<R> actorProgram;
private final ActorSystem system;
private final Address.Master master;
- public AkkaActors(final ActorProgram actorProgram, final Partitioner partitioner) {
+ public AkkaActors(final ActorProgram<R> actorProgram, final Partitioner partitioner) {
this.actorProgram = actorProgram;
this.system = ActorSystem.create("traversal-" + actorProgram.hashCode());
- this.master = new Address.Master(this.system.actorOf(Props.create(MasterTraversalActor.class, this.actorProgram, partitioner), "master").path().toString());
+ this.master = new Address.Master(this.system.actorOf(Props.create(MasterActor.class, this.actorProgram, partitioner), "master").path().toString());
}
@Override
@@ -51,12 +50,12 @@ public final class AkkaActors<S, E> implements Actors<S, E> {
}
@Override
- public Future<TraverserSet<E>> submit() {
+ public Future<R> submit() {
return CompletableFuture.supplyAsync(() -> {
while (!this.system.isTerminated()) {
}
- return (TraverserSet) this.actorProgram.getResult();
+ return this.actorProgram.getResult();
});
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1c342d5e/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
new file mode 100644
index 0000000..d7b45fa
--- /dev/null
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actor;
+
+import akka.actor.AbstractActor;
+import akka.actor.ActorSelection;
+import akka.actor.Props;
+import akka.dispatch.RequiresMessageQueue;
+import akka.japi.pf.ReceiveBuilder;
+import org.apache.tinkerpop.gremlin.process.actor.Actor;
+import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actor.Address;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MasterActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, Actor.Master {
+
+ private final Address.Master master;
+ private final List<Address.Worker> workers;
+ private final Map<Address, ActorSelection> actors = new HashMap<>();
+
+ public MasterActor(final ActorProgram program, final Partitioner partitioner) {
+ this.master = new Address.Master(self().path().toString());
+ this.workers = new ArrayList<>();
+ final List<Partition> partitions = partitioner.getPartitions();
+ for (final Partition partition : partitions) {
+ this.workers.add(new Address.Worker("worker-" + partition.hashCode()));
+ context().actorOf(Props.create(WorkerActor.class, program, partitioner, partition), "worker-" + partition.hashCode());
+ }
+ final ActorProgram.Master masterProgram = program.createMasterProgram(this);
+ receive(ReceiveBuilder.matchAny(masterProgram::execute).build());
+ masterProgram.setup();
+ }
+
+ @Override
+ public <M> void send(final Address toActor, final M message) {
+ ActorSelection actor = this.actors.get(toActor);
+ if (null == actor) {
+ actor = context().actorSelection(toActor.location());
+ this.actors.put(toActor, actor);
+ }
+ actor.tell(message, self());
+ }
+
+ @Override
+ public List<Address.Worker> workers() {
+ return this.workers;
+ }
+
+ @Override
+ public Address.Master address() {
+ return this.master;
+ }
+
+ @Override
+ public void close() {
+ context().system().terminate();
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1c342d5e/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
deleted file mode 100644
index 6799a28..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterTraversalActor.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor;
-
-import akka.actor.AbstractActor;
-import akka.actor.ActorSelection;
-import akka.actor.Props;
-import akka.dispatch.RequiresMessageQueue;
-import akka.japi.pf.ReceiveBuilder;
-import org.apache.tinkerpop.gremlin.process.actor.Actor;
-import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
-import org.apache.tinkerpop.gremlin.process.actor.Address;
-import org.apache.tinkerpop.gremlin.structure.Partition;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class MasterTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, Actor.Master {
-
- private final Address.Master master;
- private final List<Address.Worker> workers;
- private final Map<Address, ActorSelection> actors = new HashMap<>();
-
- public MasterTraversalActor(final ActorProgram program, final Partitioner partitioner) {
- this.master = new Address.Master(self().path().toString());
- this.workers = new ArrayList<>();
- final List<Partition> partitions = partitioner.getPartitions();
- for (final Partition partition : partitions) {
- this.workers.add(new Address.Worker("worker-" + partition.hashCode()));
- context().actorOf(Props.create(WorkerTraversalActor.class, program, partitioner, partition), "worker-" + partition.hashCode());
- }
- final ActorProgram.Master masterProgram = program.createMasterProgram(this);
- receive(ReceiveBuilder.matchAny(masterProgram::execute).build());
- masterProgram.setup();
- }
-
- @Override
- public <M> void send(final Address toActor, final M message) {
- ActorSelection actor = this.actors.get(toActor);
- if (null == actor) {
- actor = context().actorSelection(toActor.location());
- this.actors.put(toActor, actor);
- }
- actor.tell(message, self());
- }
-
- @Override
- public List<Address.Worker> workers() {
- return this.workers;
- }
-
- @Override
- public Address.Master address() {
- return this.master;
- }
-
- @Override
- public void close() {
- context().system().terminate();
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1c342d5e/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
new file mode 100644
index 0000000..84dbe37
--- /dev/null
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actor;
+
+import akka.actor.AbstractActor;
+import akka.actor.ActorSelection;
+import akka.dispatch.RequiresMessageQueue;
+import akka.japi.pf.ReceiveBuilder;
+import org.apache.tinkerpop.gremlin.process.actor.Actor;
+import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actor.Address;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class WorkerActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, Actor.Worker {
+
+ private final Partition localPartition;
+ private final Address.Worker self;
+ private final Address.Master master;
+ private final List<Address.Worker> workers;
+ private final Map<Address, ActorSelection> actors = new HashMap<>();
+
+ public WorkerActor(final ActorProgram program, final Partitioner partitioner, final Partition localPartition) {
+ this.localPartition = localPartition;
+ this.self = new Address.Worker("../worker-" + localPartition.hashCode());
+ this.master = new Address.Master(context().parent().path().toString());
+ this.workers = new ArrayList<>();
+ for (final Partition partition : partitioner.getPartitions()) {
+ this.workers.add(new Address.Worker("../worker-" + partition.hashCode()));
+ }
+ ActorProgram.Worker workerProgram = program.createWorkerProgram(this);
+ receive(ReceiveBuilder.matchAny(workerProgram::execute).build());
+ workerProgram.setup();
+ }
+
+ @Override
+ public <M> void send(final Address toActor, final M message) {
+ ActorSelection actor = this.actors.get(toActor);
+ if (null == actor) {
+ actor = context().actorSelection(toActor.location());
+ this.actors.put(toActor, actor);
+ }
+ actor.tell(message, self());
+ }
+
+ @Override
+ public List<Address.Worker> workers() {
+ return this.workers;
+ }
+
+ @Override
+ public Partition partition() {
+ return this.localPartition;
+ }
+
+ @Override
+ public Address.Worker address() {
+ return this.self;
+ }
+
+ @Override
+ public Address.Master master() {
+ return this.master;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1c342d5e/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
deleted file mode 100644
index 5a6bae7..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerTraversalActor.java
+++ /dev/null
@@ -1,91 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor;
-
-import akka.actor.AbstractActor;
-import akka.actor.ActorSelection;
-import akka.dispatch.RequiresMessageQueue;
-import akka.japi.pf.ReceiveBuilder;
-import org.apache.tinkerpop.gremlin.process.actor.Actor;
-import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
-import org.apache.tinkerpop.gremlin.process.actor.Address;
-import org.apache.tinkerpop.gremlin.structure.Partition;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class WorkerTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics>, Actor.Worker {
-
- private final Partition localPartition;
- private final Address.Worker self;
- private final Address.Master master;
- private final List<Address.Worker> workers;
- private final Map<Address, ActorSelection> actors = new HashMap<>();
-
- public WorkerTraversalActor(final ActorProgram program, final Partitioner partitioner, final Partition localPartition) {
- this.localPartition = localPartition;
- this.self = new Address.Worker(self().path().toString());
- this.master = new Address.Master(context().parent().path().toString());
- this.workers = new ArrayList<>();
- for (final Partition partition : partitioner.getPartitions()) {
- this.workers.add(new Address.Worker("../worker-" + partition.hashCode()));
- }
- ActorProgram.Worker workerProgram = program.createWorkerProgram(this);
- receive(ReceiveBuilder.matchAny(workerProgram::execute).build());
- workerProgram.setup();
- }
-
- @Override
- public <M> void send(final Address toActor, final M message) {
- ActorSelection actor = this.actors.get(toActor);
- if (null == actor) {
- actor = context().actorSelection(toActor.location());
- this.actors.put(toActor, actor);
- }
- actor.tell(message, self());
- }
-
- @Override
- public List<Address.Worker> workers() {
- return this.workers;
- }
-
- @Override
- public Partition partition() {
- return this.localPartition;
- }
-
- @Override
- public Address.Worker address() {
- return this.self;
- }
-
- @Override
- public Address.Master master() {
- return this.master;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1c342d5e/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
index 2e410ec..7b0c4a4 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
@@ -19,16 +19,14 @@
package org.apache.tinkerpop.gremlin.process.actor;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-
import java.util.concurrent.Future;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public interface Actors<S, E> {
+public interface Actors<R> {
public Address.Master master();
- public Future<TraverserSet<E>> submit();
+ public Future<R> submit();
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1c342d5e/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java
index c598eb7..ff45e30 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java
@@ -36,15 +36,22 @@ public abstract class Address implements Serializable {
return this.location;
}
+ @Override
public boolean equals(final Object other) {
return other instanceof Address && ((Address) other).location.equals(this.location);
}
+ @Override
public int hashCode() {
return this.location.hashCode();
}
- public static class Master extends Address {
+ @Override
+ public String toString() {
+ return this.location();
+ }
+
+ public static final class Master extends Address {
public Master(final String location) {
super(location);
@@ -52,7 +59,7 @@ public abstract class Address implements Serializable {
}
- public static class Worker extends Address {
+ public static final class Worker extends Address {
public Worker(final String location) {
super(location);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1c342d5e/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
index 278fb3b..e72b989 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
@@ -33,24 +33,24 @@ import org.apache.tinkerpop.gremlin.structure.Partitioner;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class TraversalActorProgram<M> implements ActorProgram<M> {
+public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet<R>> {
- private final Traversal.Admin<?, ?> traversal;
+ private final Traversal.Admin<?, R> traversal;
private final Partitioner partitioner;
- public TraverserSet<?> result = new TraverserSet<>();
+ public TraverserSet<R> result = new TraverserSet<>();
- public TraversalActorProgram(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
+ public TraversalActorProgram(final Traversal.Admin<?, R> traversal, final Partitioner partitioner) {
this.partitioner = partitioner;
final TraversalStrategies strategies = traversal.getStrategies().clone();
strategies.removeStrategies(ComputerVerificationStrategy.class, StandardVerificationStrategy.class);
strategies.addStrategies(ActorVerificationStrategy.instance());
traversal.setStrategies(strategies);
traversal.applyStrategies();
- this.traversal = ((TraversalVertexProgramStep) traversal.getStartStep()).computerTraversal.get();
+ this.traversal = (Traversal.Admin) ((TraversalVertexProgramStep) traversal.getStartStep()).computerTraversal.get();
}
@Override
- public Worker<M> createWorkerProgram(final Actor.Worker worker) {
+ public Worker createWorkerProgram(final Actor.Worker worker) {
return new TraversalWorkerProgram<>(worker, this.traversal.clone(), this.partitioner);
}
@@ -60,7 +60,7 @@ public final class TraversalActorProgram<M> implements ActorProgram<M> {
}
@Override
- public M getResult() {
- return (M) this.result;
+ public TraverserSet<R> getResult() {
+ return this.result;
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1c342d5e/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
index 654969b..ba051e2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
@@ -48,32 +48,27 @@ import java.util.Map;
public class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
private final Actor.Master master;
- private final Map<String, Address.Worker> workers = new HashMap<>();
private final Traversal.Admin<?, ?> traversal;
private final TraversalMatrix<?, ?> matrix;
private final Partitioner partitioner;
private Map<String, Barrier> barriers = new HashMap<>();
private final TraverserSet<?> results;
- private final String leaderWorker;
+ private Address.Worker leaderWorker;
public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final Partitioner partitioner, final TraverserSet<?> results) {
this.traversal = traversal;
- System.out.println("master[created]: " + master.address().location());
- System.out.println(this.traversal);
+ //System.out.println("master[created]: " + master.address().location());
+ //System.out.println(this.traversal);
this.matrix = new TraversalMatrix<>(this.traversal);
this.partitioner = partitioner;
this.results = results;
this.master = master;
- this.leaderWorker = "worker-" + this.partitioner.getPartitions().get(0).hashCode();
}
@Override
public void setup() {
- for (final Address.Worker worker : master.workers()) {
- this.workers.put(worker.location(), worker);
- }
+ this.leaderWorker = this.master.workers().get(0);
this.broadcast(StartMessage.instance());
-
}
@Override
@@ -105,7 +100,7 @@ public class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
}
}
this.barriers.clear();
- this.master.send(this.workers.get(this.leaderWorker), StartMessage.instance());
+ this.master.send(this.leaderWorker, StartMessage.instance());
} else {
while (this.traversal.hasNext()) {
this.results.add((Traverser.Admin) this.traversal.nextTraverser());
@@ -123,7 +118,7 @@ public class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
}
private void broadcast(final Object message) {
- for (final Address.Worker worker : this.workers.values()) {
+ for (final Address.Worker worker : this.master.workers()) {
this.master.send(worker, message);
}
}
@@ -145,7 +140,7 @@ public class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
if (traverser.isHalted())
this.results.add(traverser);
else if (traverser.get() instanceof Element)
- this.master.send(this.workers.get("worker-" + this.partitioner.getPartition((Element) traverser.get()).hashCode()), traverser);
+ this.master.send(this.master.workers().get(this.partitioner.getPartitions().indexOf(this.partitioner.getPartition((Element) traverser.get()))), traverser);
else
this.master.send(this.master.address(), traverser);
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1c342d5e/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
index 58e06d6..4275caa 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
@@ -61,8 +61,7 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
private final Partition localPartition;
private final Partitioner partitioner;
//
- private final Map<String, Address.Worker> workers = new HashMap<>();
- private final String neighborWorker;
+ private Address.Worker neighborWorker;
private boolean isLeader;
private Terminate terminate = null;
private boolean voteToHalt = false;
@@ -70,10 +69,10 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
this.self = self;
- System.out.println("worker[created]: " + this.self.address().location());
+ // System.out.println("worker[created]: " + this.self.address().location());
// set up partition and traversal information
- this.localPartition = self.partition();
this.partitioner = partitioner;
+ this.localPartition = self.partition();
final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self);
TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal);
this.matrix = new TraversalMatrix<>(traversal);
@@ -88,19 +87,14 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
((GraphStep<Edge, Edge>) traversal.getStartStep()).setIteratorSupplier(
() -> IteratorUtils.filter(this.localPartition.edges(graphStep.getIds()), this.localPartition::contains));
}
- // create termination ring topology
- final int i = this.partitioner.getPartitions().indexOf(this.localPartition);
- this.neighborWorker = "../worker-" + this.partitioner.getPartitions().get(i == this.partitioner.getPartitions().size() - 1 ? 0 : i + 1).hashCode();
- this.isLeader = i == 0;
- for (final Address.Worker worker : self.workers()) {
- //if (!worker.equals(this.self.address()))
- this.workers.put(worker.location(), worker);
- }
}
@Override
public void setup() {
-
+ // create termination ring topology
+ final int i = this.self.workers().indexOf(this.self.address());
+ this.neighborWorker = this.self.workers().get(i == this.self.workers().size() - 1 ? 0 : i + 1);
+ this.isLeader = i == 0;
}
@Override
@@ -113,7 +107,7 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
this.sendTraverser(step.next());
}
// internal vote to have in mailbox as final message to process
- // assert null == this.terminate;
+ assert null == this.terminate;
if (this.isLeader) {
this.terminate = Terminate.MAYBE;
this.self.send(this.self.address(), VoteToHaltMessage.instance());
@@ -125,7 +119,7 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
} else if (message instanceof SideEffectSetMessage) {
this.matrix.getTraversal().getSideEffects().set(((SideEffectSetMessage) message).getKey(), ((SideEffectSetMessage) message).getValue());
} else if (message instanceof Terminate) {
- // assert this.isLeader || this.terminate != Terminate.MAYBE;
+ assert this.isLeader || this.terminate != Terminate.MAYBE;
this.terminate = (Terminate) message;
this.self.send(this.self.address(), VoteToHaltMessage.instance());
} else if (message instanceof VoteToHaltMessage) {
@@ -145,9 +139,9 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
if (this.voteToHalt && Terminate.YES == this.terminate)
this.self.send(this.self.master(), VoteToHaltMessage.instance());
else
- this.self.send(this.workers.get(this.neighborWorker), Terminate.YES);
+ this.self.send(this.neighborWorker, Terminate.YES);
} else
- this.self.send(this.workers.get(this.neighborWorker), this.voteToHalt ? this.terminate : Terminate.NO);
+ this.self.send(this.neighborWorker, this.voteToHalt ? this.terminate : Terminate.NO);
this.terminate = null;
this.voteToHalt = true;
}
@@ -169,7 +163,7 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
//////////////
private void processTraverser(final Traverser.Admin traverser) {
- // assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get());
+ assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.localPartition.contains((Element) traverser.get());
final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
if (step instanceof Bypassing) ((Bypassing) step).setBypass(true);
GraphComputing.atMaster(step, false);
@@ -188,7 +182,7 @@ public class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
if (traverser.isHalted())
this.self.send(this.self.master(), traverser);
else if (traverser.get() instanceof Element && !this.localPartition.contains((Element) traverser.get()))
- this.self.send(this.workers.get("../worker-" + this.partitioner.getPartition((Element) traverser.get()).hashCode()), traverser);
+ this.self.send(this.self.workers().get(this.partitioner.getPartitions().indexOf(this.partitioner.getPartition((Element) traverser.get()))), traverser);
else
this.self.send(this.self.address(), traverser);
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1c342d5e/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java
index 77be06b..207dd57 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/ActorStep.java
@@ -32,6 +32,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.structure.Partitioner;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
@@ -70,7 +71,7 @@ public final class ActorStep<S, E> extends AbstractStep<E, E> {
if (this.first) {
this.first = false;
try {
- final Actors<S, E> actors = this.actorsClass.getConstructor(ActorProgram.class, Partitioner.class).newInstance(new TraversalActorProgram(this.partitionTraversal, partitioner), this.partitioner);
+ final Actors<TraverserSet<E>> actors = this.actorsClass.getConstructor(ActorProgram.class, Partitioner.class).newInstance(new TraversalActorProgram<E>(this.partitionTraversal, partitioner), this.partitioner);
actors.submit().get().forEach(this.starts::add);
} catch (final Exception e) {
throw new IllegalStateException(e.getMessage(), e);