You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/07 11:59:47 UTC
tinkerpop git commit: Added a working actor system in the test
package of TinkerGraph (currently for play).
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 0b07e6d47 -> 8c4c915c0
Added a working actor system in the test package of TinkerGraph (currently for play).
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/8c4c915c
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/8c4c915c
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/8c4c915c
Branch: refs/heads/TINKERPOP-1564
Commit: 8c4c915c0a05d8b500774db4cb1fb84a1bb39dea
Parents: 0b07e6d
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Dec 7 04:59:42 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Dec 7 04:59:42 2016 -0700
----------------------------------------------------------------------
.../process/traversal/TraversalSource.java | 3 +-
.../gremlin_python/process/graph_traversal.py | 4 +
tinkergraph-gremlin/pom.xml | 20 +++++
.../process/akka/MasterTraversalActor.java | 74 +++++++++++++++++++
.../process/akka/TinkerActorSystem.java | 66 +++++++++++++++++
.../process/akka/WorkerTraversalActor.java | 78 ++++++++++++++++++++
6 files changed, 243 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8c4c915c/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
index 02ccfcf..d0a5179 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalSource.java
@@ -26,7 +26,6 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.decorati
import org.apache.tinkerpop.gremlin.process.remote.RemoteConnection;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SackStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SideEffectStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.finalization.PartitionerStrategy;
import org.apache.tinkerpop.gremlin.structure.Graph;
import org.apache.tinkerpop.gremlin.structure.Partitioner;
import org.apache.tinkerpop.gremlin.util.function.ConstantSupplier;
@@ -132,7 +131,7 @@ public interface TraversalSource extends Cloneable, AutoCloseable {
public default TraversalSource withPartitioner(final Partitioner partitioner) {
final TraversalSource clone = this.clone();
- clone.getStrategies().addStrategies(new PartitionerStrategy(partitioner));
+ //clone.getStrategies().addStrategies(new PartitionerStrategy(partitioner));
clone.getBytecode().addSource(Symbols.withPartitioner, partitioner);
return clone;
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8c4c915c/gremlin-python/src/main/jython/gremlin_python/process/graph_traversal.py
----------------------------------------------------------------------
diff --git a/gremlin-python/src/main/jython/gremlin_python/process/graph_traversal.py b/gremlin-python/src/main/jython/gremlin_python/process/graph_traversal.py
index b712613..d3ec713 100644
--- a/gremlin-python/src/main/jython/gremlin_python/process/graph_traversal.py
+++ b/gremlin-python/src/main/jython/gremlin_python/process/graph_traversal.py
@@ -42,6 +42,10 @@ class GraphTraversalSource(object):
source = GraphTraversalSource(self.graph, TraversalStrategies(self.traversal_strategies), Bytecode(self.bytecode))
source.bytecode.add_source("withBulk", *args)
return source
+ def withPartitioner(self, *args):
+ source = GraphTraversalSource(self.graph, TraversalStrategies(self.traversal_strategies), Bytecode(self.bytecode))
+ source.bytecode.add_source("withPartitioner", *args)
+ return source
def withPath(self, *args):
source = GraphTraversalSource(self.graph, TraversalStrategies(self.traversal_strategies), Bytecode(self.bytecode))
source.bytecode.add_source("withPath", *args)
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8c4c915c/tinkergraph-gremlin/pom.xml
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/pom.xml b/tinkergraph-gremlin/pom.xml
index 02e32d7..1965329 100644
--- a/tinkergraph-gremlin/pom.xml
+++ b/tinkergraph-gremlin/pom.xml
@@ -54,6 +54,26 @@ limitations under the License.
<artifactId>slf4j-log4j12</artifactId>
<scope>test</scope>
</dependency>
+ <!-- PLAYING WITH AKKA -->
+ <dependency>
+ <groupId>com.typesafe.akka</groupId>
+ <artifactId>akka-actor_2.11</artifactId>
+ <version>2.4.14</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.scala-lang</groupId>
+ <artifactId>scala-library</artifactId>
+ <version>2.11.8</version>
+ <scope>test</scope>
+ </dependency>
+ <!-- ................ -->
</dependencies>
<build>
<directory>${basedir}/target</directory>
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8c4c915c/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
new file mode 100644
index 0000000..4ee2494
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
+
+import akka.actor.AbstractActor;
+import akka.actor.ActorPath;
+import akka.actor.ActorRef;
+import akka.actor.Props;
+import akka.japi.pf.ReceiveBuilder;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.step.map.TraversalVertexProgramStep;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MasterTraversalActor extends AbstractActor {
+
+ private Traversal.Admin<?, ?> traversal;
+ private final Partitioner partitioner;
+ private List<ActorPath> workers;
+
+ public MasterTraversalActor(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
+ System.out.println("master[created]: " + self().path());
+ traversal.applyStrategies();
+ this.traversal = ((TraversalVertexProgramStep) traversal.getStartStep()).computerTraversal.get();
+ this.partitioner = partitioner;
+ this.initializeWorkers();
+
+ receive(ReceiveBuilder.
+ match(Traverser.Admin.class, traverser -> {
+ if (traverser.isHalted())
+ System.out.println("master[result]: " + traverser);
+ else
+ throw new RuntimeException("Master should only process halted traversers: " + traverser);
+
+ }).build()
+ );
+ }
+
+ public void initializeWorkers() {
+ final List<Partition> partitions = this.partitioner.getPartitions();
+ this.workers = new ArrayList<>(partitions.size());
+ for (final Partition partition : partitions) {
+ final ActorRef worker = context().actorOf(Props.create(WorkerTraversalActor.class, this.traversal.clone(), partition, this.partitioner), "worker-" + partition.hashCode());
+ this.workers.add(worker.path());
+ }
+ for (final ActorPath worker : this.workers) {
+ context().actorSelection(worker).tell(true, self());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8c4c915c/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
new file mode 100644
index 0000000..a7f4dd2
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import org.apache.tinkerpop.gremlin.process.traversal.P;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
+import org.apache.tinkerpop.gremlin.structure.util.HashPartitioner;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.as;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TinkerActorSystem {
+
+ private final ActorSystem system;
+
+ public TinkerActorSystem(final Traversal.Admin<?, ?> traversal) {
+ this.system = ActorSystem.create("traversal-" + traversal.hashCode());
+ this.system.actorOf(Props.create(MasterTraversalActor.class,
+ traversal,
+ new HashPartitioner(traversal.getGraph().get().partitioner(), 5)), "master");
+ }
+
+ public void close() {
+ this.system.terminate();
+ }
+
+ //////////////
+
+ public static void main(String args[]) throws Exception {
+ final Graph graph = TinkerGraph.open();
+ graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo");
+ final Traversal.Admin<?, ?> traversal = graph.traversal().withComputer().V().match(
+ as("a").out("created").as("b"),
+ as("b").in("created").as("c"),
+ as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin();
+ final TinkerActorSystem system = new TinkerActorSystem(traversal);
+ Thread.sleep(1000);
+ system.close();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/8c4c915c/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
new file mode 100644
index 0000000..339647f
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
+
+import akka.actor.AbstractActor;
+import akka.japi.pf.ReceiveBuilder;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class WorkerTraversalActor extends AbstractActor {
+
+ private final TraversalMatrix<?, ?> matrix;
+ private final Partition partition;
+ private final Partitioner partitioner;
+
+ public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition partition, final Partitioner partitioner) {
+ System.out.println("worker[created]: " + self().path());
+ this.matrix = new TraversalMatrix<>(traversal);
+ this.partition = partition;
+ this.partitioner = partitioner;
+ ((GraphStep) traversal.getStartStep()).setIteratorSupplier(partition::vertices);
+
+
+ receive(ReceiveBuilder.
+ match(Boolean.class, bool -> {
+ final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep();
+ while (step.hasNext()) {
+ self().tell(step.next(), self());
+ }
+ }).
+ match(Traverser.Admin.class, this::processTraverser).build()
+ );
+ }
+
+ private void processTraverser(final Traverser.Admin traverser) {
+ if (traverser.isHalted())
+ context().actorSelection("../").tell(traverser, self());
+ else {
+ final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
+ step.addStart(traverser);
+ while (step.hasNext()) {
+ final Traverser.Admin<?> end = step.next();
+ if (end.get() instanceof Element && !this.partition.contains((Element) end.get())) {
+ final Partition otherPartition = this.partitioner.getPartition((Element) end.get());
+ context().actorSelection("../worker-" + otherPartition.hashCode()).tell(end, self());
+ } else {
+ this.processTraverser(end);
+ }
+ }
+ }
+ }
+}