You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/11 17:53:51 UTC
[43/50] [abbrv] tinkerpop git commit: refactored GraphActors
packaging -- its not actor/, but actors/. JavaDoc and various cleanups. Also,
about to NOT serialize a traversal but instead use Bytecode. Next push will do
this with TraveraslVertexProgram.
refactored GraphActors packaging -- its not actor/, but actors/. JavaDoc and various cleanups. Also, about to NOT serialize a traversal but instead use Bytecode. Next push will do this with TraveraslVertexProgram.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/3ac32164
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/3ac32164
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/3ac32164
Branch: refs/heads/TINKERPOP-1564
Commit: 3ac32164f3a5e56c9a78386f7514aafcc14cd4df
Parents: 7016301
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 4 08:01:29 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 11 10:52:48 2017 -0700
----------------------------------------------------------------------
.../gremlin/akka/jsr223/AkkaGremlinPlugin.java | 2 +-
.../akka/process/actor/ActorMailbox.java | 141 ---------------
.../akka/process/actor/AkkaGraphActors.java | 130 --------------
.../gremlin/akka/process/actor/MasterActor.java | 117 ------------
.../gremlin/akka/process/actor/WorkerActor.java | 112 ------------
.../akka/process/actors/ActorMailbox.java | 141 +++++++++++++++
.../akka/process/actors/AkkaGraphActors.java | 130 ++++++++++++++
.../akka/process/actors/MasterActor.java | 117 ++++++++++++
.../akka/process/actors/WorkerActor.java | 112 ++++++++++++
.../src/main/resources/application.conf | 6 +-
.../process/AkkaActorsProcessActorsTest.java | 34 ----
.../akka/process/AkkaActorsProvider.java | 158 ----------------
.../gremlin/akka/process/AkkaPlayTest.java | 89 ---------
.../actors/AkkaActorsProcessActorsTest.java | 33 ++++
.../akka/process/actors/AkkaActorsProvider.java | 158 ++++++++++++++++
.../akka/process/actors/AkkaPlayTest.java | 89 +++++++++
.../tinkerpop/gremlin/process/actor/Actor.java | 95 ----------
.../gremlin/process/actor/ActorProgram.java | 145 ---------------
.../gremlin/process/actor/ActorsResult.java | 30 ----
.../gremlin/process/actor/Address.java | 76 --------
.../gremlin/process/actor/GraphActors.java | 98 ----------
.../actor/traversal/TraversalActorProgram.java | 111 ------------
.../actor/traversal/TraversalMasterProgram.java | 179 -------------------
.../actor/traversal/TraversalWorkerProgram.java | 170 ------------------
.../traversal/WorkerTraversalSideEffects.java | 148 ---------------
.../traversal/message/BarrierAddMessage.java | 47 -----
.../traversal/message/BarrierDoneMessage.java | 41 -----
.../traversal/message/SideEffectAddMessage.java | 43 -----
.../traversal/message/SideEffectSetMessage.java | 42 -----
.../actor/traversal/message/StartMessage.java | 35 ----
.../actor/traversal/message/Terminate.java | 28 ---
.../step/map/TraversalActorProgramStep.java | 73 --------
.../decoration/ActorProgramStrategy.java | 94 ----------
.../verification/ActorVerificationStrategy.java | 55 ------
.../process/actor/util/DefaultActorsResult.java | 42 -----
.../process/actor/util/GraphActorsHelper.java | 48 -----
.../tinkerpop/gremlin/process/actors/Actor.java | 95 ++++++++++
.../gremlin/process/actors/ActorProgram.java | 145 +++++++++++++++
.../gremlin/process/actors/ActorsResult.java | 30 ++++
.../gremlin/process/actors/Address.java | 76 ++++++++
.../gremlin/process/actors/GraphActors.java | 107 +++++++++++
.../actors/traversal/TraversalActorProgram.java | 129 +++++++++++++
.../traversal/TraversalMasterProgram.java | 179 +++++++++++++++++++
.../traversal/TraversalWorkerProgram.java | 170 ++++++++++++++++++
.../traversal/WorkerTraversalSideEffects.java | 148 +++++++++++++++
.../traversal/message/BarrierAddMessage.java | 47 +++++
.../traversal/message/BarrierDoneMessage.java | 41 +++++
.../traversal/message/SideEffectAddMessage.java | 43 +++++
.../traversal/message/SideEffectSetMessage.java | 42 +++++
.../actors/traversal/message/StartMessage.java | 35 ++++
.../actors/traversal/message/Terminate.java | 28 +++
.../step/map/TraversalActorProgramStep.java | 73 ++++++++
.../decoration/ActorProgramStrategy.java | 94 ++++++++++
.../verification/ActorVerificationStrategy.java | 48 +++++
.../actors/util/DefaultActorsResult.java | 42 +++++
.../process/actors/util/GraphActorsHelper.java | 48 +++++
.../process/traversal/TraversalStrategies.java | 2 +-
.../gremlin/structure/util/StringFactory.java | 2 +-
.../apache/tinkerpop/gremlin/GraphManager.java | 2 +-
.../apache/tinkerpop/gremlin/GraphProvider.java | 2 +-
.../gremlin/process/actors/GraphActorsTest.java | 1 -
61 files changed, 2408 insertions(+), 2390 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/jsr223/AkkaGremlinPlugin.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/jsr223/AkkaGremlinPlugin.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/jsr223/AkkaGremlinPlugin.java
index 049c5b7..5c06bff 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/jsr223/AkkaGremlinPlugin.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/jsr223/AkkaGremlinPlugin.java
@@ -19,7 +19,7 @@
package org.apache.tinkerpop.gremlin.akka.jsr223;
-import org.apache.tinkerpop.gremlin.akka.process.actor.AkkaGraphActors;
+import org.apache.tinkerpop.gremlin.akka.process.actors.AkkaGraphActors;
import org.apache.tinkerpop.gremlin.jsr223.AbstractGremlinPlugin;
import org.apache.tinkerpop.gremlin.jsr223.DefaultImportCustomizer;
import org.apache.tinkerpop.gremlin.jsr223.ImportCustomizer;
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/ActorMailbox.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/ActorMailbox.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/ActorMailbox.java
deleted file mode 100644
index c8e5fde..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/ActorMailbox.java
+++ /dev/null
@@ -1,141 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor;
-
-import akka.actor.ActorRef;
-import akka.actor.ActorSystem;
-import akka.dispatch.Envelope;
-import akka.dispatch.MailboxType;
-import akka.dispatch.MessageQueue;
-import akka.dispatch.ProducesMessageQueue;
-import com.typesafe.config.Config;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import scala.Option;
-
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Queue;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ActorMailbox implements MailboxType, ProducesMessageQueue<ActorMailbox.ActorMessageQueue> {
-
- private final List<Class> messagePriorities = new ArrayList<>();
-
- public static class ActorMessageQueue implements MessageQueue, ActorSemantics {
- private final List<Class> messagePriorities;
- private final List<Queue> messages;
- private final Object MUTEX = new Object();
-
- public ActorMessageQueue(final List<Class> messagePriorities) {
- this.messagePriorities = messagePriorities;
- this.messages = new ArrayList<>(this.messagePriorities.size());
- for (final Class clazz : this.messagePriorities) {
- final Queue queue;
- if (Traverser.class.isAssignableFrom(clazz))
- queue = new TraverserSet<>();
- else
- queue = new LinkedList<>();
- this.messages.add(queue);
- }
- }
-
- public void enqueue(final ActorRef receiver, final Envelope handle) {
- synchronized (MUTEX) {
- final Object message = handle.message();
- for (int i = 0; i < this.messagePriorities.size(); i++) {
- final Class clazz = this.messagePriorities.get(i);
- if (clazz.isInstance(message)) {
- this.messages.get(i).offer(message instanceof Traverser ? message : handle);
- return;
- }
- }
- throw new IllegalArgumentException("The provided message type is not registered: " + handle.message().getClass());
- }
- }
-
- public Envelope dequeue() {
- synchronized (MUTEX) {
- for (final Queue queue : this.messages) {
- if (!queue.isEmpty()) {
- final Object m = queue.poll();
- return m instanceof Traverser ? new Envelope(m, ActorRef.noSender()) : (Envelope) m;
- }
- }
- return null;
- }
- }
-
- public int numberOfMessages() {
- synchronized (MUTEX) {
- int counter = 0;
- for (final Queue queue : this.messages) {
- counter = counter + queue.size();
- }
- return counter;
- }
- }
-
- public boolean hasMessages() {
- synchronized (MUTEX) {
- for (final Queue queue : this.messages) {
- if (!queue.isEmpty())
- return true;
- }
- return false;
- }
- }
-
- public void cleanUp(final ActorRef owner, final MessageQueue deadLetters) {
- synchronized (MUTEX) {
- for (final Queue queue : this.messages) {
- while (!queue.isEmpty()) {
- final Object m = queue.poll();
- deadLetters.enqueue(owner, m instanceof Traverser ? new Envelope(m, ActorRef.noSender()) : (Envelope) m);
- }
- }
- }
- }
- }
-
- // This constructor signature must exist, it will be called by Akka
- public ActorMailbox(final ActorSystem.Settings settings, final Config config) {
- try {
- final String[] messages = ((String) settings.config().getAnyRef("message-priorities")).replace("[", "").replace("]", "").split(",");
- for (final String clazz : messages) {
- this.messagePriorities.add(Class.forName(clazz.trim()));
- }
- } catch (final ClassNotFoundException e) {
- throw new IllegalArgumentException(e.getMessage(), e);
- }
- }
-
- // The create method is called to create the MessageQueue
- public MessageQueue create(final Option<ActorRef> owner, final Option<ActorSystem> system) {
- return new ActorMessageQueue(this.messagePriorities);
- }
-
- public static interface ActorSemantics {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
deleted file mode 100644
index bc692c0..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/AkkaGraphActors.java
+++ /dev/null
@@ -1,130 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor;
-
-import akka.actor.ActorSystem;
-import akka.actor.Props;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import com.typesafe.config.ConfigValueFactory;
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationUtils;
-import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
-import org.apache.tinkerpop.gremlin.process.actor.ActorsResult;
-import org.apache.tinkerpop.gremlin.process.actor.Address;
-import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
-import org.apache.tinkerpop.gremlin.process.actor.util.DefaultActorsResult;
-import org.apache.tinkerpop.gremlin.process.actor.util.GraphActorsHelper;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
-import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.Collections;
-import java.util.UUID;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.Future;
-import java.util.stream.Collectors;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class AkkaGraphActors<R> implements GraphActors<R> {
-
- private ActorProgram<R> actorProgram;
- private int workers = 1;
- private Configuration configuration;
- private boolean executed = false;
-
- private AkkaGraphActors(final Configuration configuration) {
- this.configuration = new BaseConfiguration();
- ConfigurationUtils.copy(configuration, this.configuration);
- this.configuration.setProperty(GRAPH_ACTORS, AkkaGraphActors.class.getCanonicalName());
- GraphActorsHelper.configure(this, this.configuration);
- }
-
- @Override
- public String toString() {
- return StringFactory.graphActorsString(this);
- }
-
- @Override
- public GraphActors<R> program(final ActorProgram<R> actorProgram) {
- this.actorProgram = actorProgram;
- return this;
- }
-
- @Override
- public GraphActors<R> workers(final int workers) {
- this.workers = workers;
- this.configuration.setProperty(GRAPH_ACTORS_WORKERS, workers);
- return this;
- }
-
- @Override
- public GraphActors<R> configure(final String key, final Object value) {
- this.configuration.setProperty(key, value);
- return this;
- }
-
- @Override
- public Future<R> submit(final Graph graph) {
- if (this.executed)
- throw new IllegalStateException("Can not execute twice");
- this.executed = true;
- final Config config = ConfigFactory.defaultApplication().withValue("message-priorities",
- ConfigValueFactory.fromAnyRef(this.actorProgram.getMessagePriorities().
- orElse(Collections.singletonList(Object.class)).
- stream().
- map(Class::getCanonicalName).
- collect(Collectors.toList()).toString()));
- final ActorSystem system = ActorSystem.create("traversal-" + UUID.randomUUID(), config);
- final ActorsResult<R> result = new DefaultActorsResult<>();
- final Partitioner partitioner = this.workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(), this.workers);
- try {
- new Address.Master(system.actorOf(Props.create(MasterActor.class, this.actorProgram, partitioner, result), "master").path().toString(), InetAddress.getLocalHost());
- } catch (final UnknownHostException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- return CompletableFuture.supplyAsync(() -> {
- while (!system.isTerminated()) {
-
- }
- return result.getResult();
- });
- }
-
- @Override
- public Configuration configuration() {
- return this.configuration;
- }
-
- public static AkkaGraphActors open(final Configuration configuration) {
- return new AkkaGraphActors(configuration);
- }
-
- public static AkkaGraphActors open() {
- return new AkkaGraphActors(new BaseConfiguration());
- }
-}
-
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
deleted file mode 100644
index 0173a8f..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/MasterActor.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor;
-
-import akka.actor.AbstractActor;
-import akka.actor.ActorSelection;
-import akka.actor.Props;
-import akka.dispatch.RequiresMessageQueue;
-import akka.japi.pf.ReceiveBuilder;
-import org.apache.tinkerpop.gremlin.process.actor.Actor;
-import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
-import org.apache.tinkerpop.gremlin.process.actor.ActorsResult;
-import org.apache.tinkerpop.gremlin.process.actor.Address;
-import org.apache.tinkerpop.gremlin.structure.Partition;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
-
-import java.net.InetAddress;
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class MasterActor extends AbstractActor implements RequiresMessageQueue<ActorMailbox.ActorSemantics>, Actor.Master {
-
- private final ActorProgram.Master masterProgram;
- private final Address.Master master;
- private final List<Address.Worker> workers;
- private final Map<Address, ActorSelection> actors = new HashMap<>();
- private final ActorsResult<?> result;
- private final Partitioner partitioner;
-
- public MasterActor(final ActorProgram program, final Partitioner partitioner, final ActorsResult<?> result) {
- this.partitioner = partitioner;
- this.result = result;
- try {
- this.master = new Address.Master(self().path().toString(), InetAddress.getLocalHost());
- } catch (final UnknownHostException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- this.workers = new ArrayList<>();
- final List<Partition> partitions = partitioner.getPartitions();
- for (final Partition partition : partitions) {
- final String workerPathString = "worker-" + partition.id();
- this.workers.add(new Address.Worker(workerPathString, partition.location()));
- context().actorOf(Props.create(WorkerActor.class, program, this.master, partition, partitioner), workerPathString);
- }
- this.masterProgram = program.createMasterProgram(this);
- receive(ReceiveBuilder.matchAny(this.masterProgram::execute).build());
- }
-
- @Override
- public void preStart() {
- this.masterProgram.setup();
- }
-
- @Override
- public void postStop() {
- this.masterProgram.terminate();
- }
-
- @Override
- public <M> void send(final Address toActor, final M message) {
- ActorSelection actor = this.actors.get(toActor);
- if (null == actor) {
- actor = context().actorSelection(toActor.getId());
- this.actors.put(toActor, actor);
- }
- actor.tell(message, self());
- }
-
- @Override
- public List<Address.Worker> workers() {
- return this.workers;
- }
-
- @Override
- public Partitioner partitioner() {
- return this.partitioner;
- }
-
- @Override
- public Address.Master address() {
- return this.master;
- }
-
- @Override
- public void close() {
- context().system().terminate();
- }
-
- @Override
- public <R> ActorsResult<R> result() {
- return (ActorsResult<R>) this.result;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
deleted file mode 100644
index 27f942a..0000000
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/WorkerActor.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process.actor;
-
-import akka.actor.AbstractActor;
-import akka.actor.ActorSelection;
-import akka.dispatch.RequiresMessageQueue;
-import akka.japi.pf.ReceiveBuilder;
-import org.apache.tinkerpop.gremlin.process.actor.Actor;
-import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
-import org.apache.tinkerpop.gremlin.process.actor.Address;
-import org.apache.tinkerpop.gremlin.structure.Partition;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class WorkerActor extends AbstractActor implements RequiresMessageQueue<ActorMailbox.ActorSemantics>, Actor.Worker {
-
- private final ActorProgram.Worker workerProgram;
- private final Partition localPartition;
- private final Partitioner partitioner;
- private final Address.Worker self;
- private final Address.Master master;
- private final List<Address.Worker> workers;
- private final Map<Address, ActorSelection> actors = new HashMap<>();
-
- public WorkerActor(final ActorProgram program, final Address.Master master, final Partition localPartition, final Partitioner partitioner) {
- this.localPartition = localPartition;
- this.partitioner = partitioner;
- this.self = new Address.Worker(this.createWorkerAddress(localPartition), localPartition.location());
- this.master = master;
- this.workers = new ArrayList<>();
- for (final Partition partition : partitioner.getPartitions()) {
- this.workers.add(new Address.Worker(this.createWorkerAddress(partition), partition.location()));
- }
- this.workerProgram = program.createWorkerProgram(this);
- receive(ReceiveBuilder.matchAny(this.workerProgram::execute).build());
- }
-
- @Override
- public void preStart() {
- this.workerProgram.setup();
- }
-
- @Override
- public void postStop() {
- this.workerProgram.terminate();
- }
-
- @Override
- public <M> void send(final Address toActor, final M message) {
- ActorSelection actor = this.actors.get(toActor);
- if (null == actor) {
- actor = context().actorSelection(toActor.getId());
- this.actors.put(toActor, actor);
- }
- actor.tell(message, self());
- }
-
- @Override
- public List<Address.Worker> workers() {
- return this.workers;
- }
-
- @Override
- public Partition partition() {
- return this.localPartition;
- }
-
- @Override
- public Partitioner partitioner() {
- return this.partitioner;
- }
-
- @Override
- public Address.Worker address() {
- return this.self;
- }
-
- @Override
- public Address.Master master() {
- return this.master;
- }
-
- private String createWorkerAddress(final Partition partition) {
- return "../worker-" + partition.id();
- }
-}
-
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
new file mode 100644
index 0000000..8087038
--- /dev/null
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/ActorMailbox.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actors;
+
+import akka.actor.ActorRef;
+import akka.actor.ActorSystem;
+import akka.dispatch.Envelope;
+import akka.dispatch.MailboxType;
+import akka.dispatch.MessageQueue;
+import akka.dispatch.ProducesMessageQueue;
+import com.typesafe.config.Config;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import scala.Option;
+
+import java.util.ArrayList;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Queue;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ActorMailbox implements MailboxType, ProducesMessageQueue<ActorMailbox.ActorMessageQueue> {
+
+ private final List<Class> messagePriorities = new ArrayList<>();
+
+ public static class ActorMessageQueue implements MessageQueue, ActorSemantics {
+ private final List<Class> messagePriorities;
+ private final List<Queue> messages;
+ private final Object MUTEX = new Object();
+
+ public ActorMessageQueue(final List<Class> messagePriorities) {
+ this.messagePriorities = messagePriorities;
+ this.messages = new ArrayList<>(this.messagePriorities.size());
+ for (final Class clazz : this.messagePriorities) {
+ final Queue queue;
+ if (Traverser.class.isAssignableFrom(clazz))
+ queue = new TraverserSet<>();
+ else
+ queue = new LinkedList<>();
+ this.messages.add(queue);
+ }
+ }
+
+ public void enqueue(final ActorRef receiver, final Envelope handle) {
+ synchronized (MUTEX) {
+ final Object message = handle.message();
+ for (int i = 0; i < this.messagePriorities.size(); i++) {
+ final Class clazz = this.messagePriorities.get(i);
+ if (clazz.isInstance(message)) {
+ this.messages.get(i).offer(message instanceof Traverser ? message : handle);
+ return;
+ }
+ }
+ throw new IllegalArgumentException("The provided message type is not registered: " + handle.message().getClass());
+ }
+ }
+
+ public Envelope dequeue() {
+ synchronized (MUTEX) {
+ for (final Queue queue : this.messages) {
+ if (!queue.isEmpty()) {
+ final Object m = queue.poll();
+ return m instanceof Traverser ? new Envelope(m, ActorRef.noSender()) : (Envelope) m;
+ }
+ }
+ return null;
+ }
+ }
+
+ public int numberOfMessages() {
+ synchronized (MUTEX) {
+ int counter = 0;
+ for (final Queue queue : this.messages) {
+ counter = counter + queue.size();
+ }
+ return counter;
+ }
+ }
+
+ public boolean hasMessages() {
+ synchronized (MUTEX) {
+ for (final Queue queue : this.messages) {
+ if (!queue.isEmpty())
+ return true;
+ }
+ return false;
+ }
+ }
+
+ public void cleanUp(final ActorRef owner, final MessageQueue deadLetters) {
+ synchronized (MUTEX) {
+ for (final Queue queue : this.messages) {
+ while (!queue.isEmpty()) {
+ final Object m = queue.poll();
+ deadLetters.enqueue(owner, m instanceof Traverser ? new Envelope(m, ActorRef.noSender()) : (Envelope) m);
+ }
+ }
+ }
+ }
+ }
+
+ // This constructor signature must exist, it will be called by Akka
+ public ActorMailbox(final ActorSystem.Settings settings, final Config config) {
+ try {
+ final String[] messages = ((String) settings.config().getAnyRef("message-priorities")).replace("[", "").replace("]", "").split(",");
+ for (final String clazz : messages) {
+ this.messagePriorities.add(Class.forName(clazz.trim()));
+ }
+ } catch (final ClassNotFoundException e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ }
+
+ // The create method is called to create the MessageQueue
+ public MessageQueue create(final Option<ActorRef> owner, final Option<ActorSystem> system) {
+ return new ActorMessageQueue(this.messagePriorities);
+ }
+
+ public static interface ActorSemantics {
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
new file mode 100644
index 0000000..3bd5fa6
--- /dev/null
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaGraphActors.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actors;
+
+import akka.actor.ActorSystem;
+import akka.actor.Props;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import com.typesafe.config.ConfigValueFactory;
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationUtils;
+import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
+import org.apache.tinkerpop.gremlin.process.actors.Address;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
+import org.apache.tinkerpop.gremlin.process.actors.util.DefaultActorsResult;
+import org.apache.tinkerpop.gremlin.process.actors.util.GraphActorsHelper;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.structure.util.partitioner.HashPartitioner;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.UUID;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Future;
+import java.util.stream.Collectors;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class AkkaGraphActors<R> implements GraphActors<R> {
+
+ private ActorProgram actorProgram;
+ private int workers = 1;
+ private Configuration configuration;
+ private boolean executed = false;
+
+ private AkkaGraphActors(final Configuration configuration) {
+ this.configuration = new BaseConfiguration();
+ ConfigurationUtils.copy(configuration, this.configuration);
+ this.configuration.setProperty(GRAPH_ACTORS, AkkaGraphActors.class.getCanonicalName());
+ GraphActorsHelper.configure(this, this.configuration);
+ }
+
+ @Override
+ public String toString() {
+ return StringFactory.graphActorsString(this);
+ }
+
+ @Override
+ public GraphActors<R> program(final ActorProgram actorProgram) {
+ this.actorProgram = actorProgram;
+ return this;
+ }
+
+ @Override
+ public GraphActors<R> workers(final int workers) {
+ this.workers = workers;
+ this.configuration.setProperty(GRAPH_ACTORS_WORKERS, workers);
+ return this;
+ }
+
+ @Override
+ public GraphActors<R> configure(final String key, final Object value) {
+ this.configuration.setProperty(key, value);
+ return this;
+ }
+
+ @Override
+ public Future<R> submit(final Graph graph) {
+ if (this.executed)
+ throw new IllegalStateException("Can not execute twice");
+ this.executed = true;
+ final Config config = ConfigFactory.defaultApplication().withValue("message-priorities",
+ ConfigValueFactory.fromAnyRef(this.actorProgram.getMessagePriorities().
+ orElse(Collections.singletonList(Object.class)).
+ stream().
+ map(Class::getCanonicalName).
+ collect(Collectors.toList()).toString()));
+ final ActorSystem system = ActorSystem.create("traversal-" + UUID.randomUUID(), config);
+ final ActorsResult<R> result = new DefaultActorsResult<>();
+ final Partitioner partitioner = this.workers == 1 ? graph.partitioner() : new HashPartitioner(graph.partitioner(), this.workers);
+ try {
+ new Address.Master(system.actorOf(Props.create(MasterActor.class, this.actorProgram, partitioner, result), "master").path().toString(), InetAddress.getLocalHost());
+ } catch (final UnknownHostException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ return CompletableFuture.supplyAsync(() -> {
+ while (!system.isTerminated()) {
+
+ }
+ return result.getResult();
+ });
+ }
+
+ @Override
+ public Configuration configuration() {
+ return this.configuration;
+ }
+
+ public static AkkaGraphActors open(final Configuration configuration) {
+ return new AkkaGraphActors(configuration);
+ }
+
+ public static AkkaGraphActors open() {
+ return new AkkaGraphActors(new BaseConfiguration());
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
new file mode 100644
index 0000000..97951a8
--- /dev/null
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/MasterActor.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actors;
+
+import akka.actor.AbstractActor;
+import akka.actor.ActorSelection;
+import akka.actor.Props;
+import akka.dispatch.RequiresMessageQueue;
+import akka.japi.pf.ReceiveBuilder;
+import org.apache.tinkerpop.gremlin.process.actors.Actor;
+import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
+import org.apache.tinkerpop.gremlin.process.actors.Address;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+import java.net.InetAddress;
+import java.net.UnknownHostException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class MasterActor extends AbstractActor implements RequiresMessageQueue<ActorMailbox.ActorSemantics>, Actor.Master {
+
+ private final ActorProgram.Master masterProgram;
+ private final Address.Master master;
+ private final List<Address.Worker> workers;
+ private final Map<Address, ActorSelection> actors = new HashMap<>();
+ private final ActorsResult<?> result;
+ private final Partitioner partitioner;
+
+ public MasterActor(final ActorProgram program, final Partitioner partitioner, final ActorsResult<?> result) {
+ this.partitioner = partitioner;
+ this.result = result;
+ try {
+ this.master = new Address.Master(self().path().toString(), InetAddress.getLocalHost());
+ } catch (final UnknownHostException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ this.workers = new ArrayList<>();
+ final List<Partition> partitions = partitioner.getPartitions();
+ for (final Partition partition : partitions) {
+ final String workerPathString = "worker-" + partition.id();
+ this.workers.add(new Address.Worker(workerPathString, partition.location()));
+ context().actorOf(Props.create(WorkerActor.class, program, this.master, partition, partitioner), workerPathString);
+ }
+ this.masterProgram = program.createMasterProgram(this);
+ receive(ReceiveBuilder.matchAny(this.masterProgram::execute).build());
+ }
+
+ @Override
+ public void preStart() {
+ this.masterProgram.setup();
+ }
+
+ @Override
+ public void postStop() {
+ this.masterProgram.terminate();
+ }
+
+ @Override
+ public <M> void send(final Address toActor, final M message) {
+ ActorSelection actor = this.actors.get(toActor);
+ if (null == actor) {
+ actor = context().actorSelection(toActor.getId());
+ this.actors.put(toActor, actor);
+ }
+ actor.tell(message, self());
+ }
+
+ @Override
+ public List<Address.Worker> workers() {
+ return this.workers;
+ }
+
+ @Override
+ public Partitioner partitioner() {
+ return this.partitioner;
+ }
+
+ @Override
+ public Address.Master address() {
+ return this.master;
+ }
+
+ @Override
+ public void close() {
+ context().system().terminate();
+ }
+
+ @Override
+ public <R> ActorsResult<R> result() {
+ return (ActorsResult<R>) this.result;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java
new file mode 100644
index 0000000..7520ce4
--- /dev/null
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actors/WorkerActor.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actors;
+
+import akka.actor.AbstractActor;
+import akka.actor.ActorSelection;
+import akka.dispatch.RequiresMessageQueue;
+import akka.japi.pf.ReceiveBuilder;
+import org.apache.tinkerpop.gremlin.process.actors.Actor;
+import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actors.Address;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class WorkerActor extends AbstractActor implements RequiresMessageQueue<ActorMailbox.ActorSemantics>, Actor.Worker {
+
+ private final ActorProgram.Worker workerProgram;
+ private final Partition localPartition;
+ private final Partitioner partitioner;
+ private final Address.Worker self;
+ private final Address.Master master;
+ private final List<Address.Worker> workers;
+ private final Map<Address, ActorSelection> actors = new HashMap<>();
+
+ public WorkerActor(final ActorProgram program, final Address.Master master, final Partition localPartition, final Partitioner partitioner) {
+ this.localPartition = localPartition;
+ this.partitioner = partitioner;
+ this.self = new Address.Worker(this.createWorkerAddress(localPartition), localPartition.location());
+ this.master = master;
+ this.workers = new ArrayList<>();
+ for (final Partition partition : partitioner.getPartitions()) {
+ this.workers.add(new Address.Worker(this.createWorkerAddress(partition), partition.location()));
+ }
+ this.workerProgram = program.createWorkerProgram(this);
+ receive(ReceiveBuilder.matchAny(this.workerProgram::execute).build());
+ }
+
+ @Override
+ public void preStart() {
+ this.workerProgram.setup();
+ }
+
+ @Override
+ public void postStop() {
+ this.workerProgram.terminate();
+ }
+
+ @Override
+ public <M> void send(final Address toActor, final M message) {
+ ActorSelection actor = this.actors.get(toActor);
+ if (null == actor) {
+ actor = context().actorSelection(toActor.getId());
+ this.actors.put(toActor, actor);
+ }
+ actor.tell(message, self());
+ }
+
+ @Override
+ public List<Address.Worker> workers() {
+ return this.workers;
+ }
+
+ @Override
+ public Partition partition() {
+ return this.localPartition;
+ }
+
+ @Override
+ public Partitioner partitioner() {
+ return this.partitioner;
+ }
+
+ @Override
+ public Address.Worker address() {
+ return this.self;
+ }
+
+ @Override
+ public Address.Master master() {
+ return this.master;
+ }
+
+ private String createWorkerAddress(final Partition partition) {
+ return "../worker-" + partition.id();
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/akka-gremlin/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/resources/application.conf b/akka-gremlin/src/main/resources/application.conf
index 7ced92c..7ee599a 100644
--- a/akka-gremlin/src/main/resources/application.conf
+++ b/akka-gremlin/src/main/resources/application.conf
@@ -1,9 +1,9 @@
custom-dispatcher {
- mailbox-requirement = "org.apache.tinkerpop.gremlin.akka.process.actor.ActorMailbox$ActorSemantics"
+ mailbox-requirement = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics"
}
akka.actor.mailbox.requirements {
- "org.apache.tinkerpop.gremlin.akka.process.actor.ActorMailbox$ActorSemantics" = custom-dispatcher-mailbox
+ "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox$ActorSemantics" = custom-dispatcher-mailbox
}
akka {
@@ -11,5 +11,5 @@ akka {
}
custom-dispatcher-mailbox {
- mailbox-type = "org.apache.tinkerpop.gremlin.akka.process.actor.ActorMailbox"
+ mailbox-type = "org.apache.tinkerpop.gremlin.akka.process.actors.ActorMailbox"
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProcessActorsTest.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProcessActorsTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProcessActorsTest.java
deleted file mode 100644
index 2c1aa57..0000000
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProcessActorsTest.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process;
-
-import org.apache.tinkerpop.gremlin.GraphProviderClass;
-import org.apache.tinkerpop.gremlin.process.ProcessActorsSuite;
-import org.apache.tinkerpop.gremlin.process.ProcessStandardSuite;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
-import org.junit.runner.RunWith;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-@RunWith(ProcessActorsSuite.class)
-@GraphProviderClass(provider = AkkaActorsProvider.class, graph = TinkerGraph.class)
-public class AkkaActorsProcessActorsTest {
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
deleted file mode 100644
index 6756e0c..0000000
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.AbstractGraphProvider;
-import org.apache.tinkerpop.gremlin.LoadGraphWith;
-import org.apache.tinkerpop.gremlin.akka.process.actor.AkkaGraphActors;
-import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.process.traversal.step.ComplexTest;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphTest;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProgramTest;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupTest;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectTest;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategyProcessTest;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.PartitionStrategyProcessTest;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SubgraphStrategyProcessTest;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.VertexProperty;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerEdge;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerElement;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraphVariables;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerProperty;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertexProperty;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Random;
-import java.util.Set;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class AkkaActorsProvider extends AbstractGraphProvider {
-
- private static final Random RANDOM = new Random();
-
- private static Set<String> SKIP_TESTS = new HashSet<>(Arrays.asList(
- "g_V_hasLabelXpersonX_V_hasLabelXsoftwareX_name",
- "g_VX1X_repeatXbothEXcreatedX_whereXwithoutXeXX_aggregateXeX_otherVX_emit_path",
- "g_withBulkXfalseX_withSackX1_sumX_V_out_barrier_sack",
- "g_V_both_groupCountXaX_out_capXaX_selectXkeysX_unfold_both_groupCountXaX_capXaX",
- GraphTest.Traversals.class.getCanonicalName(),
- GroupTest.Traversals.class.getCanonicalName(),
- ComplexTest.Traversals.class.getCanonicalName(),
- SubgraphTest.Traversals.class.getCanonicalName(),
- SideEffectTest.Traversals.class.getCanonicalName(),
- SubgraphStrategyProcessTest.class.getCanonicalName(),
- ProfileTest.Traversals.class.getCanonicalName(),
- PartitionStrategyProcessTest.class.getCanonicalName(),
- EventStrategyProcessTest.class.getCanonicalName(),
- ElementIdStrategyProcessTest.class.getCanonicalName(),
- TraversalInterruptionTest.class.getCanonicalName(),
- ProgramTest.Traversals.class.getCanonicalName()));
-
- private static final Set<Class> IMPLEMENTATION = new HashSet<Class>() {{
- add(TinkerEdge.class);
- add(TinkerElement.class);
- add(TinkerGraph.class);
- add(TinkerGraphVariables.class);
- add(TinkerProperty.class);
- add(TinkerVertex.class);
- add(TinkerVertexProperty.class);
- }};
-
- @Override
- public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName,
- final LoadGraphWith.GraphData loadGraphWith) {
-
- final TinkerGraph.DefaultIdManager idManager = selectIdMakerFromGraphData(loadGraphWith);
- final String idMaker = (idManager.equals(TinkerGraph.DefaultIdManager.ANY) ? selectIdMakerFromGraphData(loadGraphWith) : idManager).name();
- return new HashMap<String, Object>() {{
- put(Graph.GRAPH, TinkerGraph.class.getName());
- put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_ID_MANAGER, idMaker);
- put(TinkerGraph.GREMLIN_TINKERGRAPH_EDGE_ID_MANAGER, idMaker);
- put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_PROPERTY_ID_MANAGER, idMaker);
- put("skipTest", SKIP_TESTS.contains(testMethodName) || SKIP_TESTS.contains(test.getCanonicalName()));
- if (loadGraphWith == LoadGraphWith.GraphData.CREW)
- put(TinkerGraph.GREMLIN_TINKERGRAPH_DEFAULT_VERTEX_PROPERTY_CARDINALITY, VertexProperty.Cardinality.list.name());
- }};
- }
-
- @Override
- public void clear(final Graph graph, final Configuration configuration) throws Exception {
- if (graph != null) graph.close();
- }
-
- @Override
- public Set<Class> getImplementations() {
- return IMPLEMENTATION;
- }
-
- /**
- * Test that load with specific graph data can be configured with a specific id manager as the data type to
- * be used in the test for that graph is known.
- */
- protected TinkerGraph.DefaultIdManager selectIdMakerFromGraphData(final LoadGraphWith.GraphData loadGraphWith) {
- if (null == loadGraphWith) return TinkerGraph.DefaultIdManager.ANY;
- if (loadGraphWith.equals(LoadGraphWith.GraphData.CLASSIC))
- return TinkerGraph.DefaultIdManager.INTEGER;
- else if (loadGraphWith.equals(LoadGraphWith.GraphData.MODERN))
- return TinkerGraph.DefaultIdManager.INTEGER;
- else if (loadGraphWith.equals(LoadGraphWith.GraphData.CREW))
- return TinkerGraph.DefaultIdManager.INTEGER;
- else if (loadGraphWith.equals(LoadGraphWith.GraphData.GRATEFUL))
- return TinkerGraph.DefaultIdManager.INTEGER;
- else
- throw new IllegalStateException(String.format("Need to define a new %s for %s", TinkerGraph.IdManager.class.getName(), loadGraphWith.name()));
- }
-
-/////////////////////////////
-/////////////////////////////
-/////////////////////////////
-
- @Override
- public GraphTraversalSource traversal(final Graph graph) {
- if ((Boolean) graph.configuration().getProperty("skipTest"))
- return graph.traversal();
- //throw new VerificationException("This test current does not work with Gremlin-Python", EmptyTraversal.instance());
- else {
- final GraphTraversalSource g = graph.traversal();
- return RANDOM.nextBoolean() ?
- g.withProcessor(AkkaGraphActors.open().workers(new Random().nextInt(15) + 1)) :
- g.withProcessor(GraphActors.open(AkkaGraphActors.class));
- }
- }
-
- @Override
- public GraphActors getGraphActors(final Graph graph) {
- return AkkaGraphActors.open().workers(new Random().nextInt(15) + 1);
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
deleted file mode 100644
index df40748..0000000
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaPlayTest.java
+++ /dev/null
@@ -1,89 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.akka.process;
-
-import org.apache.tinkerpop.gremlin.akka.process.actor.AkkaGraphActors;
-import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
-import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
-import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.in;
-import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public class AkkaPlayTest {
-
- @Test
- @Ignore
- public void testPlay1() throws Exception {
- final Graph graph = TinkerGraph.open();
- graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo");
- GraphTraversalSource g = graph.traversal().withProcessor(GraphActors.open(AkkaGraphActors.class).workers(3));
- // System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList());
-
- for (int i = 0; i < 1000; i++) {
- if (12l != g.V().union(out(), in()).values("name").count().next())
- System.out.println(i);
- }
-
- //3, 1.9, 1
- /*for (int i = 0; i < 10000; i++) {
- final Graph graph = TinkerGraph.open();
- graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo");
- final GraphTraversalSource g = graph.traversal().withComputer();
- final List<Pair<Integer, Traversal.Admin<?, ?>>> traversals = Arrays.asList(
- // match() works
- Pair.with(6, g.V().match(
- as("a").out("created").as("b"),
- as("b").in("created").as("c"),
- as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin()),
- // side-effects work
- Pair.with(3, g.V().repeat(both()).times(2).
- groupCount("a").by("name").
- cap("a").unfold().order().by(Column.values, Order.decr).limit(3).asAdmin()),
- // barriers work and beyond the local star graph works
- Pair.with(1, g.V().repeat(both()).times(2).hasLabel("person").
- group().
- by("name").
- by(out("created").values("name").dedup().fold()).asAdmin()),
- // no results works
- Pair.with(0, g.V().out("blah").asAdmin())
- );
- for (final Pair<Integer,Traversal.Admin<?, ?>> pair : traversals) {
- final Integer count = pair.getValue0();
- final Traversal.Admin<?,?> traversal = pair.getValue1();
- System.out.println("EXECUTING: " + traversal.getBytecode());
- final TinkerActorSystem<?,?> actors = new TinkerActorSystem<>(traversal.clone(),new HashPartitioner(graph.partitioner(), 3));
- System.out.println(IteratorUtils.asList(actors.getResults().get()));
- if(IteratorUtils.count(actors.getResults().get()) != count)
- throw new IllegalStateException();
- System.out.println("//////////////////////////////////\n");
- }
- }
- }*/
-
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProcessActorsTest.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProcessActorsTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProcessActorsTest.java
new file mode 100644
index 0000000..e0feef0
--- /dev/null
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProcessActorsTest.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actors;
+
+import org.apache.tinkerpop.gremlin.GraphProviderClass;
+import org.apache.tinkerpop.gremlin.process.ProcessActorsSuite;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.junit.runner.RunWith;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+@RunWith(ProcessActorsSuite.class)
+@GraphProviderClass(provider = AkkaActorsProvider.class, graph = TinkerGraph.class)
+public class AkkaActorsProcessActorsTest {
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
new file mode 100644
index 0000000..4168445
--- /dev/null
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaActorsProvider.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actors;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.AbstractGraphProvider;
+import org.apache.tinkerpop.gremlin.LoadGraphWith;
+import org.apache.tinkerpop.gremlin.akka.process.actors.AkkaGraphActors;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalInterruptionTest;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.process.traversal.step.ComplexTest;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphTest;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProfileTest;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.ProgramTest;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.GroupTest;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectTest;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphTest;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ElementIdStrategyProcessTest;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.EventStrategyProcessTest;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.PartitionStrategyProcessTest;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.SubgraphStrategyProcessTest;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerEdge;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerElement;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraphVariables;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerProperty;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertex;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerVertexProperty;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class AkkaActorsProvider extends AbstractGraphProvider {
+
+ private static final Random RANDOM = new Random();
+
+ private static Set<String> SKIP_TESTS = new HashSet<>(Arrays.asList(
+ "g_V_hasLabelXpersonX_V_hasLabelXsoftwareX_name",
+ "g_VX1X_repeatXbothEXcreatedX_whereXwithoutXeXX_aggregateXeX_otherVX_emit_path",
+ "g_withBulkXfalseX_withSackX1_sumX_V_out_barrier_sack",
+ "g_V_both_groupCountXaX_out_capXaX_selectXkeysX_unfold_both_groupCountXaX_capXaX",
+ GraphTest.Traversals.class.getCanonicalName(),
+ GroupTest.Traversals.class.getCanonicalName(),
+ ComplexTest.Traversals.class.getCanonicalName(),
+ SubgraphTest.Traversals.class.getCanonicalName(),
+ SideEffectTest.Traversals.class.getCanonicalName(),
+ SubgraphStrategyProcessTest.class.getCanonicalName(),
+ ProfileTest.Traversals.class.getCanonicalName(),
+ PartitionStrategyProcessTest.class.getCanonicalName(),
+ EventStrategyProcessTest.class.getCanonicalName(),
+ ElementIdStrategyProcessTest.class.getCanonicalName(),
+ TraversalInterruptionTest.class.getCanonicalName(),
+ ProgramTest.Traversals.class.getCanonicalName()));
+
+ private static final Set<Class> IMPLEMENTATION = new HashSet<Class>() {{
+ add(TinkerEdge.class);
+ add(TinkerElement.class);
+ add(TinkerGraph.class);
+ add(TinkerGraphVariables.class);
+ add(TinkerProperty.class);
+ add(TinkerVertex.class);
+ add(TinkerVertexProperty.class);
+ }};
+
+ @Override
+ public Map<String, Object> getBaseConfiguration(final String graphName, final Class<?> test, final String testMethodName,
+ final LoadGraphWith.GraphData loadGraphWith) {
+
+ final TinkerGraph.DefaultIdManager idManager = selectIdMakerFromGraphData(loadGraphWith);
+ final String idMaker = (idManager.equals(TinkerGraph.DefaultIdManager.ANY) ? selectIdMakerFromGraphData(loadGraphWith) : idManager).name();
+ return new HashMap<String, Object>() {{
+ put(Graph.GRAPH, TinkerGraph.class.getName());
+ put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_ID_MANAGER, idMaker);
+ put(TinkerGraph.GREMLIN_TINKERGRAPH_EDGE_ID_MANAGER, idMaker);
+ put(TinkerGraph.GREMLIN_TINKERGRAPH_VERTEX_PROPERTY_ID_MANAGER, idMaker);
+ put("skipTest", SKIP_TESTS.contains(testMethodName) || SKIP_TESTS.contains(test.getCanonicalName()));
+ if (loadGraphWith == LoadGraphWith.GraphData.CREW)
+ put(TinkerGraph.GREMLIN_TINKERGRAPH_DEFAULT_VERTEX_PROPERTY_CARDINALITY, VertexProperty.Cardinality.list.name());
+ }};
+ }
+
+ @Override
+ public void clear(final Graph graph, final Configuration configuration) throws Exception {
+ if (graph != null) graph.close();
+ }
+
+ @Override
+ public Set<Class> getImplementations() {
+ return IMPLEMENTATION;
+ }
+
+ /**
+ * Test that load with specific graph data can be configured with a specific id manager as the data type to
+ * be used in the test for that graph is known.
+ */
+ protected TinkerGraph.DefaultIdManager selectIdMakerFromGraphData(final LoadGraphWith.GraphData loadGraphWith) {
+ if (null == loadGraphWith) return TinkerGraph.DefaultIdManager.ANY;
+ if (loadGraphWith.equals(LoadGraphWith.GraphData.CLASSIC))
+ return TinkerGraph.DefaultIdManager.INTEGER;
+ else if (loadGraphWith.equals(LoadGraphWith.GraphData.MODERN))
+ return TinkerGraph.DefaultIdManager.INTEGER;
+ else if (loadGraphWith.equals(LoadGraphWith.GraphData.CREW))
+ return TinkerGraph.DefaultIdManager.INTEGER;
+ else if (loadGraphWith.equals(LoadGraphWith.GraphData.GRATEFUL))
+ return TinkerGraph.DefaultIdManager.INTEGER;
+ else
+ throw new IllegalStateException(String.format("Need to define a new %s for %s", TinkerGraph.IdManager.class.getName(), loadGraphWith.name()));
+ }
+
+/////////////////////////////
+/////////////////////////////
+/////////////////////////////
+
+ @Override
+ public GraphTraversalSource traversal(final Graph graph) {
+ if ((Boolean) graph.configuration().getProperty("skipTest"))
+ return graph.traversal();
+ //throw new VerificationException("This test current does not work with Gremlin-Python", EmptyTraversal.instance());
+ else {
+ final GraphTraversalSource g = graph.traversal();
+ return RANDOM.nextBoolean() ?
+ g.withProcessor(AkkaGraphActors.open().workers(new Random().nextInt(15) + 1)) :
+ g.withProcessor(GraphActors.open(AkkaGraphActors.class));
+ }
+ }
+
+ @Override
+ public GraphActors getGraphActors(final Graph graph) {
+ return AkkaGraphActors.open().workers(new Random().nextInt(15) + 1);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
new file mode 100644
index 0000000..d4562eb
--- /dev/null
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/actors/AkkaPlayTest.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.akka.process.actors;
+
+import org.apache.tinkerpop.gremlin.akka.process.actors.AkkaGraphActors;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
+import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.io.gryo.GryoIo;
+import org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.in;
+import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.out;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class AkkaPlayTest {
+
+ @Test
+ @Ignore
+ public void testPlay1() throws Exception {
+ final Graph graph = TinkerGraph.open();
+ graph.io(GryoIo.build()).readGraph("../data/tinkerpop-modern.kryo");
+ GraphTraversalSource g = graph.traversal().withProcessor(GraphActors.open(AkkaGraphActors.class).workers(3));
+ // System.out.println(g.V().group().by("name").by(outE().values("weight").fold()).toList());
+
+ for (int i = 0; i < 1000; i++) {
+ if (12l != g.V().union(out(), in()).values("name").count().next())
+ System.out.println(i);
+ }
+
+ //3, 1.9, 1
+ /*for (int i = 0; i < 10000; i++) {
+ final Graph graph = TinkerGraph.open();
+ graph.io(GryoIo.build()).readGraph("data/tinkerpop-modern.kryo");
+ final GraphTraversalSource g = graph.traversal().withComputer();
+ final List<Pair<Integer, Traversal.Admin<?, ?>>> traversals = Arrays.asList(
+ // match() works
+ Pair.with(6, g.V().match(
+ as("a").out("created").as("b"),
+ as("b").in("created").as("c"),
+ as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin()),
+ // side-effects work
+ Pair.with(3, g.V().repeat(both()).times(2).
+ groupCount("a").by("name").
+ cap("a").unfold().order().by(Column.values, Order.decr).limit(3).asAdmin()),
+ // barriers work and beyond the local star graph works
+ Pair.with(1, g.V().repeat(both()).times(2).hasLabel("person").
+ group().
+ by("name").
+ by(out("created").values("name").dedup().fold()).asAdmin()),
+ // no results works
+ Pair.with(0, g.V().out("blah").asAdmin())
+ );
+ for (final Pair<Integer,Traversal.Admin<?, ?>> pair : traversals) {
+ final Integer count = pair.getValue0();
+ final Traversal.Admin<?,?> traversal = pair.getValue1();
+ System.out.println("EXECUTING: " + traversal.getBytecode());
+ final TinkerActorSystem<?,?> actors = new TinkerActorSystem<>(traversal.clone(),new HashPartitioner(graph.partitioner(), 3));
+ System.out.println(IteratorUtils.asList(actors.getResults().get()));
+ if(IteratorUtils.count(actors.getResults().get()) != count)
+ throw new IllegalStateException();
+ System.out.println("//////////////////////////////////\n");
+ }
+ }
+ }*/
+
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
deleted file mode 100644
index 5a0b869..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Actor.java
+++ /dev/null
@@ -1,95 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor;
-
-import org.apache.tinkerpop.gremlin.structure.Partition;
-import org.apache.tinkerpop.gremlin.structure.Partitioner;
-
-import java.util.List;
-
-/**
- * An Actor represents an isolated processing unit that can only be interacted with via messages.
- * Actors are able to send and receive messages. The {@link GraphActors} framework has two types of actors:
- * {@link Master} and {@link Worker}. A master actor is not associated with a particular graph {@link Partition}.
- * Instead, its role is to coordinate the workers and ultimately, yield the final result of the submitted
- * {@link ActorProgram}.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface Actor {
-
- /**
- * Get the {@link Partitioner} associated with the {@link GraphActors} system.
- *
- * @return the partitioner used to partition (logically and/or physically) the {@link org.apache.tinkerpop.gremlin.structure.Graph}
- */
- public Partitioner partitioner();
-
- /**
- * Get the {@link Address} of the actor.
- *
- * @return the actor's address
- */
- public Address address();
-
- /**
- * Get a list of the {@link Address} values of all the workers in {@link GraphActors} system.
- *
- * @return the worker's addresses
- */
- public List<Address.Worker> workers();
-
- /**
- * Send a message from this actor to another actor given their {@link Address}.
- *
- * @param toActor the actor to receive the messages
- * @param message the message being sent
- * @param <M> the message type
- */
- public <M> void send(final Address toActor, final M message);
-
- public interface Master extends Actor {
-
- public Address.Master address();
-
- public void close();
-
- public <R> ActorsResult<R> result();
-
- }
-
- public interface Worker extends Actor {
-
- public Address.Worker address();
-
- public Address.Master master();
-
- /**
- * Get the {@link Partition} associated with this worker.
- * In principle, this is the subset of the {@link org.apache.tinkerpop.gremlin.structure.Graph} that
- * the worker is "data-local" to.
- *
- * @return the worker's partition
- */
- public Partition partition();
- }
-
-
-}