You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/07 11:59:47 UTC

tinkerpop git commit: Added a working actor system in the test package of TinkerGraph (currently for play).

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 0b07e6d47 -> 8c4c915c0


Added a working actor system in the test package of TinkerGraph (currently for play).


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/8c4c915c
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/8c4c915c
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/8c4c915c

Branch: refs/heads/TINKERPOP-1564
Commit: 8c4c915c0a05d8b500774db4cb1fb84a1bb39dea
Parents: 0b07e6d
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Dec 7 04:59:42 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Dec 7 04:59:42 2016 -0700

----------------------------------------------------------------------
 .../process/traversal/TraversalSource.java      |  3 +-
 .../gremlin_python/process/graph_traversal.py   |  4 +
 tinkergraph-gremlin/pom.xml                     | 20 +++++
 .../process/akka/MasterTraversalActor.java      | 74 +++++++++++++++++++
 .../process/akka/TinkerActorSystem.java         | 66 +++++++++++++++++
 .../process/akka/WorkerTraversalActor.java      | 78 ++++++++++++++++++++
 6 files changed, 243 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8c4c915c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
index 02ccfcf..d0a5179 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
@@ -26,7 +26,6 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decorati
 import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SackStrategy;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SideEffectStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.finalization.PartitionerStrategy;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Partitioner;
 import org.apache.tinkerpop.gremlin.util.function.ConstantSupplier;
@@ -132,7 +131,7 @@ public interface TraversalSource extends Cloneable, AutoCloseable {
 
     public default TraversalSource withPartitioner(final Partitioner partitioner) {
         final TraversalSource clone = this.clone();
-        clone.getStrategies().addStrategies(new PartitionerStrategy(partitioner));
+        //clone.getStrategies().addStrategies(new PartitionerStrategy(partitioner));
         clone.getBytecode().addSource(Symbols.withPartitioner, partitioner);
         return clone;
     }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8c4c915c/gremlin-python/src/main/jython/gremlin_python/process/graph_traversal.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/process/graph_traversal.py b/gremlin-python/src/main/jython/gremlin_python/process/graph_traversal.py
index b712613..d3ec713 100644
--- a/gremlin-python/src/main/jython/gremlin_python/process/graph_traversal.py
+++ b/gremlin-python/src/main/jython/gremlin_python/process/graph_traversal.py
@@ -42,6 +42,10 @@ class GraphTraversalSource(object):
     source = GraphTraversalSource(self.graph, TraversalStrategies(self.traversal_strategies), Bytecode(self.bytecode))
     source.bytecode.add_source("withBulk", *args)
     return source
+  def withPartitioner(self, *args):
+    source = GraphTraversalSource(self.graph, TraversalStrategies(self.traversal_strategies), Bytecode(self.bytecode))
+    source.bytecode.add_source("withPartitioner", *args)
+    return source
   def withPath(self, *args):
     source = GraphTraversalSource(self.graph, TraversalStrategies(self.traversal_strategies), Bytecode(self.bytecode))
     source.bytecode.add_source("withPath", *args)

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8c4c915c/tinkergraph-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/pom.xml b/tinkergraph-gremlin/pom.xml
index 02e32d7..1965329 100644
--- a/tinkergraph-gremlin/pom.xml
+++ b/tinkergraph-gremlin/pom.xml
@@ -54,6 +54,26 @@ limitations under the License.
             <artifactId>slf4j-log4j12</artifactId>
             <scope>test</scope>
         </dependency>
+        <!-- PLAYING WITH AKKA -->
+        <dependency>
+            <groupId>com.typesafe.akka</groupId>
+            <artifactId>akka-actor_2.11</artifactId>
+            <version>2.4.14</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.scala-lang</groupId>
+                    <artifactId>scala-library</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.scala-lang</groupId>
+            <artifactId>scala-library</artifactId>
+            <version>2.11.8</version>
+            <scope>test</scope>
+        </dependency>
+        <!-- ................ -->
     </dependencies>
     <build>
         <directory>${basedir}/target</directory>

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8c4c915c/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
new file mode 100644
index 0000000..4ee2494
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
@@ -0,0 +1,74 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
+
+import akka.actor.AbstractActor;
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.pf.ReceiveBuilder;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MasterTraversalActor extends AbstractActor {
+
+    private Traversal.Admin<?, ?> traversal;
+    private final Partitioner partitioner;
+    private List<ActorPath> workers;
+
+    public MasterTraversalActor(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
+        System.out.println("master[created]: " + self().path());
+        traversal.applyStrategies();
+        this.traversal = ((TraversalVertexProgramStep) traversal.getStartStep()).computerTraversal.get();
+        this.partitioner = partitioner;
+        this.initializeWorkers();
+
+        receive(ReceiveBuilder.
+                match(Traverser.Admin.class, traverser -> {
+                    if (traverser.isHalted())
+                        System.out.println("master[result]: " + traverser);
+                    else
+                        throw new RuntimeException("Master should only process halted traversers: " + traverser);
+
+                }).build()
+        );
+    }
+
+    public void initializeWorkers() {
+        final List<Partition> partitions = this.partitioner.getPartitions();
+        this.workers = new ArrayList<>(partitions.size());
+        for (final Partition partition : partitions) {
+            final ActorRef worker = context().actorOf(Props.create(WorkerTraversalActor.class, this.traversal.clone(), partition, this.partitioner), "worker-" + partition.hashCode());
+            this.workers.add(worker.path());
+        }
+        for (final ActorPath worker : this.workers) {
+            context().actorSelection(worker).tell(true, self());
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8c4c915c/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
new file mode 100644
index 0000000..a7f4dd2
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
@@ -0,0 +1,66 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import org.apache.tinkerpop.gremlin.process.traversal.P;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
+import org.apache.tinkerpop.gremlin.structure.util.HashPartitioner;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.as;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TinkerActorSystem {
+
+    private final ActorSystem system;
+
+    public TinkerActorSystem(final Traversal.Admin<?, ?> traversal) {
+        this.system = ActorSystem.create("traversal-" + traversal.hashCode());
+        this.system.actorOf(Props.create(MasterTraversalActor.class,
+                traversal,
+                new HashPartitioner(traversal.getGraph().get().partitioner(), 5)), "master");
+    }
+
+    public void close() {
+        this.system.terminate();
+    }
+
+    //////////////
+
+    public static void main(String args[]) throws Exception {
+        final Graph graph = TinkerGraph.open();
+        graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo");
+        final Traversal.Admin<?, ?> traversal = graph.traversal().withComputer().V().match(
+                as("a").out("created").as("b"),
+                as("b").in("created").as("c"),
+                as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin();
+        final TinkerActorSystem system = new TinkerActorSystem(traversal);
+        Thread.sleep(1000);
+        system.close();
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8c4c915c/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
new file mode 100644
index 0000000..339647f
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -0,0 +1,78 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
+
+import akka.actor.AbstractActor;
+import akka.japi.pf.ReceiveBuilder;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class WorkerTraversalActor extends AbstractActor {
+
+    private final TraversalMatrix<?, ?> matrix;
+    private final Partition partition;
+    private final Partitioner partitioner;
+
+    public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition partition, final Partitioner partitioner) {
+        System.out.println("worker[created]: " + self().path());
+        this.matrix = new TraversalMatrix<>(traversal);
+        this.partition = partition;
+        this.partitioner = partitioner;
+        ((GraphStep) traversal.getStartStep()).setIteratorSupplier(partition::vertices);
+
+
+        receive(ReceiveBuilder.
+                match(Boolean.class, bool -> {
+                    final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep();
+                    while (step.hasNext()) {
+                        self().tell(step.next(), self());
+                    }
+                }).
+                match(Traverser.Admin.class, this::processTraverser).build()
+        );
+    }
+
+    private void processTraverser(final Traverser.Admin traverser) {
+        if (traverser.isHalted())
+            context().actorSelection("../").tell(traverser, self());
+        else {
+            final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
+            step.addStart(traverser);
+            while (step.hasNext()) {
+                final Traverser.Admin<?> end = step.next();
+                if (end.get() instanceof Element && !this.partition.contains((Element) end.get())) {
+                    final Partition otherPartition = this.partitioner.getPartition((Element) end.get());
+                    context().actorSelection("../worker-" + otherPartition.hashCode()).tell(end, self());
+                } else {
+                    this.processTraverser(end);
+                }
+            }
+        }
+    }
+}