You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/15 18:33:32 UTC
tinkerpop git commit: commited to
g.withProcessor(Processor.Description). Actors and Computer implement
Processor.Description. Following the VertexProgramStrategy model,
this makes it easy for language variant providers to support any arbitrary
Processor
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 df2b6a484 -> fa481e907
commited to g.withProcessor(Processor.Description). Actors and Computer implement Processor.Description. Following the VertexProgramStrategy model, this makes it easy for language variant providers to support any arbitrary Processor down the line.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/fa481e90
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/fa481e90
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/fa481e90
Branch: refs/heads/TINKERPOP-1564
Commit: fa481e907ebaac7076b7f974db6137b0698842e3
Parents: df2b6a4
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Dec 15 11:33:28 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Dec 15 11:33:28 2016 -0700
----------------------------------------------------------------------
.../akka/process/actor/AkkaGraphActors.java | 3 +-
.../akka/process/AkkaActorsProvider.java | 2 +-
.../gremlin/akka/process/AkkaPlayTest.java | 2 +-
.../tinkerpop/gremlin/process/Processor.java | 2 +-
.../tinkerpop/gremlin/process/actor/Actors.java | 22 +++--
.../decoration/ActorProgramStrategy.java | 87 +++++++++++++++++---
.../gremlin/process/computer/Computer.java | 3 +-
.../decoration/VertexProgramStrategy.java | 2 +-
.../process/traversal/TraversalSource.java | 5 +-
.../gremlin/process/traversal/BytecodeTest.java | 8 +-
10 files changed, 104 insertions(+), 32 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
index 2f62beb..51747ac 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
@@ -34,6 +34,7 @@ import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import java.net.InetAddress;
import java.net.UnknownHostException;
+import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.stream.Collectors;
@@ -51,7 +52,7 @@ public final class AkkaGraphActors<R> implements GraphActors<R> {
final Config config = ConfigFactory.defaultApplication().
withValue("message-priorities",
ConfigValueFactory.fromAnyRef(actorProgram.getMessagePriorities().get().stream().map(Class::getCanonicalName).collect(Collectors.toList()).toString()));
- this.system = ActorSystem.create("traversal-" + actorProgram.hashCode(), config);
+ this.system = ActorSystem.create("traversal-" + UUID.randomUUID(), config);
try {
this.master = new Address.Master(this.system.actorOf(Props.create(MasterActor.class, actorProgram, partitioner, result), "master").path().toString(), InetAddress.getLocalHost());
} catch (final UnknownHostException e) {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
index b36e3b5..3e9f5df 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
@@ -147,7 +147,7 @@ public class AkkaActorsProvider extends AbstractGraphProvider {
else {
final GraphTraversalSource g = graph.traversal();
return RANDOM.nextBoolean() ?
- g.withProcessor(Actors.of(AkkaGraphActors.class).partitioner(new HashPartitioner(graph.partitioner(), new Random().nextInt(15) + 1))) :
+ g.withProcessor(Actors.of(AkkaGraphActors.class).workers(new Random().nextInt(15) + 1)) :
g.withProcessor(Actors.of(AkkaGraphActors.class));
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
index 93fac3d..7de8304 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
@@ -42,7 +42,7 @@ public class AkkaPlayTest {
public void testPlay1() throws Exception {
final Graph graph = TinkerGraph.open();
graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo");
- GraphTraversalSource g = graph.traversal().withProcessor(Actors.of(AkkaGraphActors.class).partitioner(new HashPartitioner(graph.partitioner(), 3)));
+ GraphTraversalSource g = graph.traversal().withProcessor(Actors.of(AkkaGraphActors.class).workers(3));
// System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList());
for (int i = 0; i < 1000; i++) {
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Processor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Processor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Processor.java
index b55415c..bffda58 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Processor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Processor.java
@@ -46,7 +46,7 @@ public interface Processor {
*
* @param traversalSource the traversal source to add processor-specific strategies to
*/
- public void addTraversalStrategies(final TraversalSource traversalSource);
+ public TraversalSource addTraversalStrategies(final TraversalSource traversalSource);
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
index 0822017..d3b5d17 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
@@ -22,15 +22,14 @@ package org.apache.tinkerpop.gremlin.process.actor;
import org.apache.tinkerpop.gremlin.process.Processor;
import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration.ActorProgramStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class Actors implements Processor.Description<GraphActors> {
- private final Class<? extends GraphActors> graphActorsClass;
- private Partitioner partitioner = null;
+ private Class<? extends GraphActors> graphActorsClass;
+ private int workers = 1;
private Actors(final Class<? extends GraphActors> graphActorsClass) {
this.graphActorsClass = graphActorsClass;
@@ -40,9 +39,15 @@ public final class Actors implements Processor.Description<GraphActors> {
return new Actors(graphActorsClass);
}
- public Actors partitioner(final Partitioner partitioner) {
+ public Actors graphActors(final Class<? extends GraphActors> graphActorsClass) {
final Actors clone = this.clone();
- clone.partitioner = partitioner;
+ clone.graphActorsClass = graphActorsClass;
+ return clone;
+ }
+
+ public Actors workers(final int workers) {
+ final Actors clone = this.clone();
+ clone.workers = workers;
return clone;
}
@@ -50,8 +55,8 @@ public final class Actors implements Processor.Description<GraphActors> {
return this.graphActorsClass;
}
- public Partitioner getPartitioner() {
- return this.partitioner;
+ public int getWorkers() {
+ return this.workers;
}
@@ -69,8 +74,9 @@ public final class Actors implements Processor.Description<GraphActors> {
}
@Override
- public void addTraversalStrategies(final TraversalSource traversalSource) {
+ public TraversalSource addTraversalStrategies(final TraversalSource traversalSource) {
final ActorProgramStrategy actorProgramStrategy = new ActorProgramStrategy(this);
traversalSource.getStrategies().addStrategies(actorProgramStrategy);
+ return traversalSource;
}
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java
index 26d3eec..81bcda6 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java
@@ -19,6 +19,8 @@
package org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
import org.apache.tinkerpop.gremlin.process.actor.Actors;
import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
import org.apache.tinkerpop.gremlin.process.actor.traversal.step.map.TraversalActorProgramStep;
@@ -29,10 +31,14 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
+import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
import java.util.Set;
/**
@@ -43,16 +49,14 @@ public final class ActorProgramStrategy extends AbstractTraversalStrategy<Traver
private static final Set<Class<? extends DecorationStrategy>> PRIORS = Collections.singleton(RemoteStrategy.class);
- private final Partitioner partitioner;
- private final Class<? extends GraphActors> actors;
+ private final Actors actors;
- private ActorProgramStrategy(final Class<? extends GraphActors> actors, final Partitioner partitioner) {
- this.actors = actors;
- this.partitioner = partitioner;
+ private ActorProgramStrategy() {
+ this(null);
}
public ActorProgramStrategy(final Actors actors) {
- this(actors.getGraphActorsClass(), actors.getPartitioner());
+ this.actors = actors;
}
@Override
@@ -62,10 +66,10 @@ public final class ActorProgramStrategy extends AbstractTraversalStrategy<Traver
if (!(traversal.getParent() instanceof EmptyStep))
return;
- final TraversalActorProgramStep<?, ?> actorStep = new TraversalActorProgramStep<>(traversal, this.actors,
- null == this.partitioner ?
+ final TraversalActorProgramStep<?, ?> actorStep = new TraversalActorProgramStep<>(traversal, this.actors.getGraphActorsClass(),
+ 1 == this.actors.getWorkers() ?
traversal.getGraph().orElse(EmptyGraph.instance()).partitioner() :
- this.partitioner);
+ new HashPartitioner(traversal.getGraph().orElse(EmptyGraph.instance()).partitioner(), this.actors.getWorkers()));
TraversalHelper.removeAllSteps(traversal);
traversal.addStep(actorStep);
@@ -79,5 +83,68 @@ public final class ActorProgramStrategy extends AbstractTraversalStrategy<Traver
public Set<Class<? extends DecorationStrategy>> applyPrior() {
return PRIORS;
}
+
+ ////////////////////////////////////////////////////////////
+
+ public static final String GRAPH_ACTORS = "graphActors";
+ public static final String WORKERS = "workers";
+
+ @Override
+ public Configuration getConfiguration() {
+ final Map<String, Object> map = new HashMap<>();
+ map.put(GRAPH_ACTORS, this.actors.getGraphActorsClass().getCanonicalName());
+ map.put(WORKERS, this.actors.getWorkers());
+ return new MapConfiguration(map);
+ }
+
+ public static ActorProgramStrategy create(final Configuration configuration) {
+ try {
+ final ActorProgramStrategy.Builder builder = ActorProgramStrategy.build();
+ for (final String key : (List<String>) IteratorUtils.asList(configuration.getKeys())) {
+ if (key.equals(GRAPH_ACTORS))
+ builder.graphComputer((Class) Class.forName(configuration.getString(key)));
+ else if (key.equals(WORKERS))
+ builder.workers(configuration.getInt(key));
+ else
+ throw new IllegalArgumentException("The provided key is unknown: " + key);
+ }
+ return builder.create();
+ } catch (final ClassNotFoundException e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ }
+
+ public static ActorProgramStrategy.Builder build() {
+ return new ActorProgramStrategy.Builder();
+ }
+
+ public final static class Builder {
+
+ private Actors actors = Actors.of(GraphActors.class);
+
+ private Builder() {
+ }
+
+ public Builder computer(final Actors actors) {
+ this.actors = actors;
+ return this;
+ }
+
+ public Builder graphComputer(final Class<? extends GraphActors> graphActorsClass) {
+ this.actors = this.actors.graphActors(graphActorsClass);
+ return this;
+ }
+
+
+ public Builder workers(final int workers) {
+ this.actors = this.actors.workers(workers);
+ return this;
+ }
+
+ public ActorProgramStrategy create() {
+ return new ActorProgramStrategy(this.actors);
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java
index 9214a5e..8691a41 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java
@@ -168,7 +168,7 @@ public final class Computer implements Processor.Description<GraphComputer>, Fun
}
@Override
- public void addTraversalStrategies(final TraversalSource traversalSource) {
+ public TraversalSource addTraversalStrategies(final TraversalSource traversalSource) {
Class<? extends GraphComputer> graphComputerClass;
if (this.getGraphComputerClass().equals(GraphComputer.class)) {
try {
@@ -181,6 +181,7 @@ public final class Computer implements Processor.Description<GraphComputer>, Fun
final List<TraversalStrategy<?>> graphComputerStrategies = TraversalStrategies.GlobalCache.getStrategies(graphComputerClass).toList();
traversalSource.getStrategies().addStrategies(graphComputerStrategies.toArray(new TraversalStrategy[graphComputerStrategies.size()]));
traversalSource.getStrategies().addStrategies(new VertexProgramStrategy(this));
+ return traversalSource;
}
/////////////////
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java
index 89e40cb..ac2e75a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java
@@ -55,7 +55,7 @@ import java.util.Set;
*/
public final class VertexProgramStrategy extends AbstractTraversalStrategy<TraversalStrategy.DecorationStrategy> implements TraversalStrategy.DecorationStrategy {
- private static final VertexProgramStrategy INSTANCE = new VertexProgramStrategy(Computer.compute());
+ private static final VertexProgramStrategy INSTANCE = new VertexProgramStrategy(Computer.of());
private final Computer computer;
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
index b6d948d..20926fc 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
@@ -137,10 +137,7 @@ public interface TraversalSource extends Cloneable, AutoCloseable {
* @return a new traversal source with updated strategies
*/
public default TraversalSource withProcessor(final Processor.Description processor) {
- final TraversalSource clone = this.clone();
- processor.addTraversalStrategies(clone);
- clone.getBytecode().addSource(Symbols.withProcessor, processor);
- return clone;
+ return processor.addTraversalStrategies(this.clone());
}
/**
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/BytecodeTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/BytecodeTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/BytecodeTest.java
index 7b1d810..62ef89b 100644
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/BytecodeTest.java
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/BytecodeTest.java
@@ -112,16 +112,16 @@ public class BytecodeTest {
assertEquals(P.gt(32), bytecode.getBindings().get("c"));
assertEquals("name", bytecode.getBindings().get("d"));
//
- Bytecode.Binding binding = (Bytecode.Binding)((List<Bytecode.Instruction>)bytecode.getStepInstructions()).get(1).getArguments()[0];
+ Bytecode.Binding binding = (Bytecode.Binding)(bytecode.getStepInstructions()).get(1).getArguments()[0];
assertEquals("a", binding.variable());
assertEquals("created", binding.value());
- binding = (Bytecode.Binding) ((List<Bytecode.Instruction>)((Bytecode)((List<Bytecode.Instruction>)bytecode.getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(0).getArguments()[0];
+ binding = (Bytecode.Binding) (((Bytecode)(bytecode.getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(0).getArguments()[0];
assertEquals("b", binding.variable());
assertEquals("knows", binding.value());
- binding = (Bytecode.Binding) ((List<Bytecode.Instruction>)((Bytecode)((List<Bytecode.Instruction>)bytecode.getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(1).getArguments()[1];
+ binding = (Bytecode.Binding) (((Bytecode)(bytecode.getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(1).getArguments()[1];
assertEquals("c", binding.variable());
assertEquals(P.gt(32), binding.value());
- binding = (Bytecode.Binding) ((List<Bytecode.Instruction>)((Bytecode)((List<Bytecode.Instruction>)((Bytecode)((List<Bytecode.Instruction>)bytecode.getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(0).getArguments()[0];
+ binding = (Bytecode.Binding) (((Bytecode)(((Bytecode)(bytecode.getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(0).getArguments()[0];
assertEquals("d", binding.variable());
assertEquals("name", binding.value());
}