You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/15 18:33:32 UTC

tinkerpop git commit: commited to g.withProcessor(Processor.Description). Actors and Computer implement Processor.Description. Following the VertexProgramStrategy model, this makes it easy for language variant providers to support any arbitrary Processor

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 df2b6a484 -> fa481e907


commited to g.withProcessor(Processor.Description). Actors and Computer implement Processor.Description. Following the VertexProgramStrategy model, this makes it easy for language variant providers to support any arbitrary Processor down the line.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/fa481e90
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/fa481e90
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/fa481e90

Branch: refs/heads/TINKERPOP-1564
Commit: fa481e907ebaac7076b7f974db6137b0698842e3
Parents: df2b6a4
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Dec 15 11:33:28 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Dec 15 11:33:28 2016 -0700

----------------------------------------------------------------------
 .../akka/process/actor/AkkaGraphActors.java     |  3 +-
 .../akka/process/AkkaActorsProvider.java        |  2 +-
 .../gremlin/akka/process/AkkaPlayTest.java      |  2 +-
 .../tinkerpop/gremlin/process/Processor.java    |  2 +-
 .../tinkerpop/gremlin/process/actor/Actors.java | 22 +++--
 .../decoration/ActorProgramStrategy.java        | 87 +++++++++++++++++---
 .../gremlin/process/computer/Computer.java      |  3 +-
 .../decoration/VertexProgramStrategy.java       |  2 +-
 .../process/traversal/TraversalSource.java      |  5 +-
 .../gremlin/process/traversal/BytecodeTest.java |  8 +-
 10 files changed, 104 insertions(+), 32 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
index 2f62beb..51747ac 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
@@ -34,6 +34,7 @@ import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 
 import java.net.InetAddress;
 import java.net.UnknownHostException;
+import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Future;
 import java.util.stream.Collectors;
@@ -51,7 +52,7 @@ public final class AkkaGraphActors<R> implements GraphActors<R> {
         final Config config = ConfigFactory.defaultApplication().
                 withValue("message-priorities",
                         ConfigValueFactory.fromAnyRef(actorProgram.getMessagePriorities().get().stream().map(Class::getCanonicalName).collect(Collectors.toList()).toString()));
-        this.system = ActorSystem.create("traversal-" + actorProgram.hashCode(), config);
+        this.system = ActorSystem.create("traversal-" + UUID.randomUUID(), config);
         try {
             this.master = new Address.Master(this.system.actorOf(Props.create(MasterActor.class, actorProgram, partitioner, result), "master").path().toString(), InetAddress.getLocalHost());
         } catch (final UnknownHostException e) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
index b36e3b5..3e9f5df 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
@@ -147,7 +147,7 @@ public class AkkaActorsProvider extends AbstractGraphProvider {
         else {
             final GraphTraversalSource g = graph.traversal();
             return RANDOM.nextBoolean() ?
-                    g.withProcessor(Actors.of(AkkaGraphActors.class).partitioner(new HashPartitioner(graph.partitioner(), new Random().nextInt(15) + 1))) :
+                    g.withProcessor(Actors.of(AkkaGraphActors.class).workers(new Random().nextInt(15) + 1)) :
                     g.withProcessor(Actors.of(AkkaGraphActors.class));
         }
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
index 93fac3d..7de8304 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
@@ -42,7 +42,7 @@ public class AkkaPlayTest {
     public void testPlay1() throws Exception {
         final Graph graph = TinkerGraph.open();
         graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo");
-        GraphTraversalSource g = graph.traversal().withProcessor(Actors.of(AkkaGraphActors.class).partitioner(new HashPartitioner(graph.partitioner(), 3)));
+        GraphTraversalSource g = graph.traversal().withProcessor(Actors.of(AkkaGraphActors.class).workers(3));
         // System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList());
 
         for (int i = 0; i < 1000; i++) {

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Processor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Processor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Processor.java
index b55415c..bffda58 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Processor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/Processor.java
@@ -46,7 +46,7 @@ public interface Processor {
          *
          * @param traversalSource the traversal source to add processor-specific strategies to
          */
-        public void addTraversalStrategies(final TraversalSource traversalSource);
+        public TraversalSource addTraversalStrategies(final TraversalSource traversalSource);
     }
 
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
index 0822017..d3b5d17 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actors.java
@@ -22,15 +22,14 @@ package org.apache.tinkerpop.gremlin.process.actor;
 import org.apache.tinkerpop.gremlin.process.Processor;
 import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration.ActorProgramStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSource;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public final class Actors implements Processor.Description<GraphActors> {
 
-    private final Class<? extends GraphActors> graphActorsClass;
-    private Partitioner partitioner = null;
+    private Class<? extends GraphActors> graphActorsClass;
+    private int workers = 1;
 
     private Actors(final Class<? extends GraphActors> graphActorsClass) {
         this.graphActorsClass = graphActorsClass;
@@ -40,9 +39,15 @@ public final class Actors implements Processor.Description<GraphActors> {
         return new Actors(graphActorsClass);
     }
 
-    public Actors partitioner(final Partitioner partitioner) {
+    public Actors graphActors(final Class<? extends GraphActors> graphActorsClass) {
         final Actors clone = this.clone();
-        clone.partitioner = partitioner;
+        clone.graphActorsClass = graphActorsClass;
+        return clone;
+    }
+
+    public Actors workers(final int workers) {
+        final Actors clone = this.clone();
+        clone.workers = workers;
         return clone;
     }
 
@@ -50,8 +55,8 @@ public final class Actors implements Processor.Description<GraphActors> {
         return this.graphActorsClass;
     }
 
-    public Partitioner getPartitioner() {
-        return this.partitioner;
+    public int getWorkers() {
+        return this.workers;
     }
 
 
@@ -69,8 +74,9 @@ public final class Actors implements Processor.Description<GraphActors> {
     }
 
     @Override
-    public void addTraversalStrategies(final TraversalSource traversalSource) {
+    public TraversalSource addTraversalStrategies(final TraversalSource traversalSource) {
         final ActorProgramStrategy actorProgramStrategy = new ActorProgramStrategy(this);
         traversalSource.getStrategies().addStrategies(actorProgramStrategy);
+        return traversalSource;
     }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java
index 26d3eec..81bcda6 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java
@@ -19,6 +19,8 @@
 
 package org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration;
 
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.MapConfiguration;
 import org.apache.tinkerpop.gremlin.process.actor.Actors;
 import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
 import org.apache.tinkerpop.gremlin.process.actor.traversal.step.map.TraversalActorProgramStep;
@@ -29,10 +31,14 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
 import org.apache.tinkerpop.gremlin.structure.util.empty.EmptyGraph;
+import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 
 /**
@@ -43,16 +49,14 @@ public final class ActorProgramStrategy extends AbstractTraversalStrategy<Traver
 
     private static final Set<Class<? extends DecorationStrategy>> PRIORS = Collections.singleton(RemoteStrategy.class);
 
-    private final Partitioner partitioner;
-    private final Class<? extends GraphActors> actors;
+    private final Actors actors;
 
-    private ActorProgramStrategy(final Class<? extends GraphActors> actors, final Partitioner partitioner) {
-        this.actors = actors;
-        this.partitioner = partitioner;
+    private ActorProgramStrategy() {
+        this(null);
     }
 
     public ActorProgramStrategy(final Actors actors) {
-        this(actors.getGraphActorsClass(), actors.getPartitioner());
+        this.actors = actors;
     }
 
     @Override
@@ -62,10 +66,10 @@ public final class ActorProgramStrategy extends AbstractTraversalStrategy<Traver
         if (!(traversal.getParent() instanceof EmptyStep))
             return;
 
-        final TraversalActorProgramStep<?, ?> actorStep = new TraversalActorProgramStep<>(traversal, this.actors,
-                null == this.partitioner ?
+        final TraversalActorProgramStep<?, ?> actorStep = new TraversalActorProgramStep<>(traversal, this.actors.getGraphActorsClass(),
+                1 == this.actors.getWorkers() ?
                         traversal.getGraph().orElse(EmptyGraph.instance()).partitioner() :
-                        this.partitioner);
+                        new HashPartitioner(traversal.getGraph().orElse(EmptyGraph.instance()).partitioner(), this.actors.getWorkers()));
         TraversalHelper.removeAllSteps(traversal);
         traversal.addStep(actorStep);
 
@@ -79,5 +83,68 @@ public final class ActorProgramStrategy extends AbstractTraversalStrategy<Traver
     public Set<Class<? extends DecorationStrategy>> applyPrior() {
         return PRIORS;
     }
+
+    ////////////////////////////////////////////////////////////
+
+    public static final String GRAPH_ACTORS = "graphActors";
+    public static final String WORKERS = "workers";
+
+    @Override
+    public Configuration getConfiguration() {
+        final Map<String, Object> map = new HashMap<>();
+        map.put(GRAPH_ACTORS, this.actors.getGraphActorsClass().getCanonicalName());
+        map.put(WORKERS, this.actors.getWorkers());
+        return new MapConfiguration(map);
+    }
+
+    public static ActorProgramStrategy create(final Configuration configuration) {
+        try {
+            final ActorProgramStrategy.Builder builder = ActorProgramStrategy.build();
+            for (final String key : (List<String>) IteratorUtils.asList(configuration.getKeys())) {
+                if (key.equals(GRAPH_ACTORS))
+                    builder.graphComputer((Class) Class.forName(configuration.getString(key)));
+                else if (key.equals(WORKERS))
+                    builder.workers(configuration.getInt(key));
+                else
+                    throw new IllegalArgumentException("The provided key is unknown: " + key);
+            }
+            return builder.create();
+        } catch (final ClassNotFoundException e) {
+            throw new IllegalArgumentException(e.getMessage(), e);
+        }
+    }
+
+    public static ActorProgramStrategy.Builder build() {
+        return new ActorProgramStrategy.Builder();
+    }
+
+    public final static class Builder {
+
+        private Actors actors = Actors.of(GraphActors.class);
+
+        private Builder() {
+        }
+
+        public Builder computer(final Actors actors) {
+            this.actors = actors;
+            return this;
+        }
+
+        public Builder graphComputer(final Class<? extends GraphActors> graphActorsClass) {
+            this.actors = this.actors.graphActors(graphActorsClass);
+            return this;
+        }
+
+
+        public Builder workers(final int workers) {
+            this.actors = this.actors.workers(workers);
+            return this;
+        }
+
+        public ActorProgramStrategy create() {
+            return new ActorProgramStrategy(this.actors);
+        }
+    }
+
 }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java
index 9214a5e..8691a41 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/Computer.java
@@ -168,7 +168,7 @@ public final class Computer implements Processor.Description<GraphComputer>, Fun
     }
 
     @Override
-    public void addTraversalStrategies(final TraversalSource traversalSource) {
+    public TraversalSource addTraversalStrategies(final TraversalSource traversalSource) {
         Class<? extends GraphComputer> graphComputerClass;
         if (this.getGraphComputerClass().equals(GraphComputer.class)) {
             try {
@@ -181,6 +181,7 @@ public final class Computer implements Processor.Description<GraphComputer>, Fun
         final List<TraversalStrategy<?>> graphComputerStrategies = TraversalStrategies.GlobalCache.getStrategies(graphComputerClass).toList();
         traversalSource.getStrategies().addStrategies(graphComputerStrategies.toArray(new TraversalStrategy[graphComputerStrategies.size()]));
         traversalSource.getStrategies().addStrategies(new VertexProgramStrategy(this));
+        return traversalSource;
     }
 
     /////////////////

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java
index 89e40cb..ac2e75a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/strategy/decoration/VertexProgramStrategy.java
@@ -55,7 +55,7 @@ import java.util.Set;
  */
 public final class VertexProgramStrategy extends AbstractTraversalStrategy<TraversalStrategy.DecorationStrategy> implements TraversalStrategy.DecorationStrategy {
 
-    private static final VertexProgramStrategy INSTANCE = new VertexProgramStrategy(Computer.compute());
+    private static final VertexProgramStrategy INSTANCE = new VertexProgramStrategy(Computer.of());
 
     private final Computer computer;
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
index b6d948d..20926fc 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
@@ -137,10 +137,7 @@ public interface TraversalSource extends Cloneable, AutoCloseable {
      * @return a new traversal source with updated strategies
      */
     public default TraversalSource withProcessor(final Processor.Description processor) {
-        final TraversalSource clone = this.clone();
-        processor.addTraversalStrategies(clone);
-        clone.getBytecode().addSource(Symbols.withProcessor, processor);
-        return clone;
+        return processor.addTraversalStrategies(this.clone());
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/fa481e90/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/BytecodeTest.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/BytecodeTest.java b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/BytecodeTest.java
index 7b1d810..62ef89b 100644
--- a/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/BytecodeTest.java
+++ b/gremlin-core/src/test/java/org/apache/tinkerpop/gremlin/process/traversal/BytecodeTest.java
@@ -112,16 +112,16 @@ public class BytecodeTest {
         assertEquals(P.gt(32), bytecode.getBindings().get("c"));
         assertEquals("name", bytecode.getBindings().get("d"));
         //
-        Bytecode.Binding binding = (Bytecode.Binding)((List<Bytecode.Instruction>)bytecode.getStepInstructions()).get(1).getArguments()[0];
+        Bytecode.Binding binding = (Bytecode.Binding)(bytecode.getStepInstructions()).get(1).getArguments()[0];
         assertEquals("a", binding.variable());
         assertEquals("created", binding.value());
-        binding = (Bytecode.Binding) ((List<Bytecode.Instruction>)((Bytecode)((List<Bytecode.Instruction>)bytecode.getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(0).getArguments()[0];
+        binding = (Bytecode.Binding) (((Bytecode)(bytecode.getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(0).getArguments()[0];
         assertEquals("b", binding.variable());
         assertEquals("knows", binding.value());
-        binding = (Bytecode.Binding) ((List<Bytecode.Instruction>)((Bytecode)((List<Bytecode.Instruction>)bytecode.getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(1).getArguments()[1];
+        binding = (Bytecode.Binding) (((Bytecode)(bytecode.getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(1).getArguments()[1];
         assertEquals("c", binding.variable());
         assertEquals(P.gt(32), binding.value());
-        binding = (Bytecode.Binding) ((List<Bytecode.Instruction>)((Bytecode)((List<Bytecode.Instruction>)((Bytecode)((List<Bytecode.Instruction>)bytecode.getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(0).getArguments()[0];
+        binding = (Bytecode.Binding) (((Bytecode)(((Bytecode)(bytecode.getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(2).getArguments()[0]).getStepInstructions()).get(0).getArguments()[0];
         assertEquals("d", binding.variable());
         assertEquals("name", binding.value());
     }