You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/23 21:24:15 UTC
[19/50] [abbrv] tinkerpop git commit: refactored GraphActors
packaging -- its not actor/, but actors/. JavaDoc and various cleanups. Also,
about to NOT serialize a traversal but instead use Bytecode. Next push will do
this with TraveraslVertexProgram.
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
deleted file mode 100644
index 89002fe..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorProgram.java
+++ /dev/null
@@ -1,145 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-
-import java.util.List;
-import java.util.Optional;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface ActorProgram<M> extends Cloneable {
-
- public static final String ACTOR_PROGRAM = "gremlin.actorProgram";
-
- /**
- * When it is necessary to store the state of the ActorProgram, this method is called.
- * This is typically required when the ActorProgram needs to be serialized to another machine.
- * Note that what is stored is simply the instance/configuration state, not any processed data.
- * The default implementation provided simply stores the ActorProgarm class name for reflective reconstruction.
- * It is typically a good idea to ActorProgram.super.storeState().
- *
- * @param configuration the configuration to store the state of the ActorProgram in.
- */
- public default void storeState(final Configuration configuration) {
- configuration.setProperty(ACTOR_PROGRAM, this.getClass().getName());
- }
-
- /**
- * When it is necessary to load the state of the ActorProgram, this method is called.
- * This is typically required when the ActorProgram needs to be serialized to another machine.
- * Note that what is loaded is simply the instance state, not any processed data.
- *
- * @param graph the graph that the ActorProgram will run against
- * @param configuration the configuration to load the state of the ActorProgram from.
- */
- public default void loadState(final Graph graph, final Configuration configuration) {
-
- }
-
- /**
- * Create the {@link org.apache.tinkerpop.gremlin.process.actor.Actor.Worker} program.
- * This is typically used by {@link Worker} to spawn its program.
- *
- * @param worker the worker actor creating the worker program
- * @return the worker program
- */
- public ActorProgram.Worker createWorkerProgram(final Actor.Worker worker);
-
- /**
- * Create the {@link org.apache.tinkerpop.gremlin.process.actor.Actor.Master} program.
- * This is typically used by {@link Master} to spawn its program.
- *
- * @param master the master actor creating the master program
- * @return the master program
- */
- public ActorProgram.Master createMasterProgram(final Actor.Master master);
-
- /**
- * Get the ordered list of message classes where order determines the priority
- * of message reception by the respective {@link Actor}. For instance,
- * if an {@link Actor} has a message of type {@code X} and a message of type {@code Y}
- * in its message buffer, and {@code X} has a higher priority, it will be fetched
- * first from the buffer. If no list is provided then its FIFO.
- * The default implementation returns an {@link Optional#empty()}.
- *
- * @return the optional ordered list of message priorities.
- */
- public default Optional<List<Class>> getMessagePriorities() {
- return Optional.empty();
- }
-
- /**
- * When multiple workers on a single machine need ActorProgram instances, it is possible to use clone.
- * This will provide a speedier way of generating instances, over the {@link ActorProgram#storeState} and {@link ActorProgram#loadState} model.
- * The default implementation simply returns the object as it assumes that the ActorProgram instance is a stateless singleton.
- *
- * @return A clone of the VertexProgram object
- */
- @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException")
- public ActorProgram<M> clone();
-
- /**
- * The Worker program is executed by a worker process in the {@link GraphActors} system.
- * There are many workers and a single master.
- * All workers execute the same program.
- *
- * @param <M> The message type accepted by the worker
- */
- public static interface Worker<M> {
-
- /**
- * This method is evaluated when the worker process is spawned.
- */
- public void setup();
-
- /**
- * This method is evaluated when the worker receives a new message.
- *
- * @param message the received message
- */
- public void execute(final M message);
-
- /**
- * This method is evaluated when the worker process is destroyed.
- */
- public void terminate();
-
- }
-
- /**
- * The Master program is executed by the master process in the {@link GraphActors} system.
- * There are many workers and a single master.
- *
- * @param <M> The message type accepted by the master
- */
- public static interface Master<M> {
- public void setup();
-
- public void execute(final M message);
-
- public void terminate();
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorsResult.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorsResult.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorsResult.java
deleted file mode 100644
index c9db36a..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/ActorsResult.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface ActorsResult<R> {
-
- public R getResult();
-
- public void setResult(final R result);
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java
deleted file mode 100644
index 9f59e5e..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/Address.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor;
-
-import java.io.Serializable;
-import java.net.InetAddress;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public abstract class Address implements Serializable {
-
- private final String id;
- private final InetAddress location;
-
- public Address(final String id, final InetAddress location) {
- this.id = id;
- this.location = location;
- }
-
- public InetAddress getLocation() {
- return this.location;
- }
-
- public String getId() {
- return this.id;
- }
-
- @Override
- public boolean equals(final Object other) {
- return other instanceof Address && ((Address) other).id.equals(this.id);
- }
-
- @Override
- public int hashCode() {
- return this.id.hashCode();
- }
-
- @Override
- public String toString() {
- return this.id;
- }
-
- public static final class Master extends Address {
-
- public Master(final String id, final InetAddress location) {
- super(id, location);
- }
-
- }
-
- public static final class Worker extends Address {
-
- public Worker(final String id, final InetAddress location) {
- super(id, location);
- }
-
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java
deleted file mode 100644
index 51f4c4a..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/GraphActors.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor;
-
-import org.apache.commons.configuration.BaseConfiguration;
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.process.Processor;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration.ActorProgramStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.ProcessorTraversalStrategy;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-
-import java.util.concurrent.Future;
-
-/**
- * GraphActors is a message-passing based graph {@link Processor} that is:
- * asynchronous, distributed, and partition centric.
- *
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public interface GraphActors<R> extends Processor {
-
- public static final String GRAPH_ACTORS = "gremlin.graphActors";
- public static final String GRAPH_ACTORS_WORKERS = "gremlin.graphActors.workers";
-
- /**
- * Provide the {@link ActorProgram} that the GraphActors will execute.
- *
- * @param program the program to execute
- * @return the updated GraphActors with newly defined program
- */
- public GraphActors<R> program(final ActorProgram<R> program);
-
- /**
- * Specify the number of workers per {@link Graph} {@link org.apache.tinkerpop.gremlin.structure.Partition}.
- *
- * @param workers the number of workers per partition
- * @return the updated GraphActors with newly defined workers
- */
- public GraphActors<R> workers(final int workers);
-
- /**
- * Add an arbitrary configuration to the GraphActors system.
- * Typically, these configurations are provider-specific and do not generalize across all GraphActor implementations.
- *
- * @param key the key of the configuration
- * @param value the value of the configuration
- * @return the updated GraphActors with newly defined configuration
- */
- public GraphActors<R> configure(final String key, final Object value);
-
- /**
- * Submit the {@link ActorProgram} for execution by the {@link GraphActors}.
- *
- * @return a {@link Future} denoting a reference to the asynchronous computation's result
- */
- public Future<R> submit(final Graph graph);
-
- /**
- * Returns an {@link ActorProgramStrategy} which enables a {@link Traversal} to execute on {@link GraphActors}.
- *
- * @return a traversal strategy capable of executing traversals on a GraphActors
- */
- public default ProcessorTraversalStrategy<GraphActors> getProcessorTraversalStrategy() {
- return new ActorProgramStrategy(this);
- }
-
- public static <A extends GraphActors> A open(final Configuration configuration) {
- try {
- return (A) Class.forName(configuration.getString(GRAPH_ACTORS)).getMethod("open", Configuration.class).invoke(null, configuration);
- } catch (final Exception e) {
- throw new IllegalArgumentException(e.getMessage(), e);
- }
- }
-
- public static <A extends GraphActors> A open(final Class<A> graphActorsClass) {
- final BaseConfiguration configuration = new BaseConfiguration();
- configuration.setProperty(GRAPH_ACTORS, graphActorsClass.getCanonicalName());
- return GraphActors.open(configuration);
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
deleted file mode 100644
index b584322..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalActorProgram.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor.traversal;
-
-import org.apache.tinkerpop.gremlin.process.actor.Actor;
-import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectSetMessage;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.StartMessage;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.Terminate;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration.ActorProgramStrategy;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.verification.ActorVerificationStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.LazyBarrierStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.MatchPredicateStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.PathRetractionStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.RepeatUnrollStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Optional;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class TraversalActorProgram<R> implements ActorProgram<TraverserSet<R>> {
-
- private static final List<Class> MESSAGE_PRIORITIES = Arrays.asList(
- StartMessage.class,
- Traverser.class,
- SideEffectAddMessage.class,
- BarrierAddMessage.class,
- SideEffectSetMessage.class,
- BarrierDoneMessage.class,
- Terminate.class);
-
- private Traversal.Admin<?, R> traversal;
- public TraverserSet<R> result = new TraverserSet<>();
-
- public TraversalActorProgram(final Traversal.Admin<?, R> traversal) {
- this.traversal = traversal;
- final TraversalStrategies strategies = this.traversal.getStrategies().clone();
- strategies.addStrategies(ActorVerificationStrategy.instance(), ReadOnlyStrategy.instance());
- // TODO: make TinkerGraph/etc. strategies smart about actors
- new ArrayList<>(strategies.toList()).stream().
- filter(s -> s instanceof TraversalStrategy.ProviderOptimizationStrategy).
- map(TraversalStrategy::getClass).
- forEach(strategies::removeStrategies);
- strategies.removeStrategies(
- ActorProgramStrategy.class,
- LazyBarrierStrategy.class,
- RepeatUnrollStrategy.class,
- MatchPredicateStrategy.class,
- InlineFilterStrategy.class,
- PathRetractionStrategy.class);
- this.traversal.setStrategies(strategies);
- this.traversal.applyStrategies();
- }
-
- @Override
- public Worker createWorkerProgram(final Actor.Worker worker) {
- return new TraversalWorkerProgram<>(worker, this.traversal.clone());
- }
-
- @Override
- public Master createMasterProgram(final Actor.Master master) {
- return new TraversalMasterProgram<>(master, this.traversal.clone(), this.result);
- }
-
- @Override
- public Optional<List<Class>> getMessagePriorities() {
- return Optional.of(MESSAGE_PRIORITIES);
- }
-
- @Override
- public ActorProgram<TraverserSet<R>> clone() {
- try {
- final TraversalActorProgram<R> clone = (TraversalActorProgram<R>) super.clone();
- clone.traversal = this.traversal.clone();
- return clone;
- } catch (final CloneNotSupportedException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
deleted file mode 100644
index 2aaf686..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
+++ /dev/null
@@ -1,179 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor.traversal;
-
-import org.apache.tinkerpop.gremlin.process.actor.Actor;
-import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
-import org.apache.tinkerpop.gremlin.process.actor.Address;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectSetMessage;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.StartMessage;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.Terminate;
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
-import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
-import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
-import org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeGlobalStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.filter.TailGlobalStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.OrderedTraverser;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
-import org.apache.tinkerpop.gremlin.structure.Element;
-import org.apache.tinkerpop.gremlin.structure.Partition;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
-
- private final Actor.Master master;
- private final Traversal.Admin<?, ?> traversal;
- private final TraversalMatrix<?, ?> matrix;
- private Map<String, Barrier> barriers = new HashMap<>();
- private final TraverserSet<?> results;
- private Address.Worker leaderWorker;
- private int orderCounter = -1;
- private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>();
-
- public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final TraverserSet<?> results) {
- this.traversal = traversal;
- // System.out.println("master[created]: " + master.address().getId());
- // System.out.println(this.traversal);
- this.matrix = new TraversalMatrix<>(this.traversal);
- this.results = results;
- this.master = master;
- Distributing.configure(this.traversal, true, true);
- Pushing.configure(this.traversal, true, false);
- }
-
- @Override
- public void setup() {
- this.leaderWorker = this.master.workers().get(0);
- for (int i = 0; i < this.master.partitioner().getPartitions().size(); i++) {
- this.partitionToWorkerMap.put(this.master.partitioner().getPartitions().get(i), this.master.workers().get(i));
- }
- this.broadcast(StartMessage.instance());
- this.master.send(this.leaderWorker, Terminate.MAYBE);
- }
-
- @Override
- public void execute(final M message) {
- if (message instanceof Traverser.Admin) {
- this.processTraverser((Traverser.Admin) message);
- } else if (message instanceof BarrierAddMessage) {
- final Barrier barrier = (Barrier) this.matrix.getStepById(((BarrierAddMessage) message).getStepId());
- final Step<?, ?> step = (Step) barrier;
- barrier.addBarrier(((BarrierAddMessage) message).getBarrier());
- this.barriers.put(step.getId(), barrier);
- } else if (message instanceof SideEffectAddMessage) {
- this.traversal.getSideEffects().add(((SideEffectAddMessage) message).getKey(), ((SideEffectAddMessage) message).getValue());
- } else if (message instanceof Terminate) {
- assert Terminate.YES == message;
- if (!this.barriers.isEmpty()) {
- for (final Barrier barrier : this.barriers.values()) {
- final Step<?, ?> step = (Step) barrier;
- if (!(barrier instanceof LocalBarrier)) {
- this.orderBarrier(step);
- if (step instanceof OrderGlobalStep) this.orderCounter = 0;
- while (step.hasNext()) {
- this.sendTraverser(-1 == this.orderCounter ?
- step.next() :
- new OrderedTraverser<>(step.next(), this.orderCounter++));
- }
- } else {
- if (step instanceof SideEffectCapable) {
- final String key = ((SideEffectCapable) step).getSideEffectKey();
- this.broadcast(new SideEffectSetMessage(key, this.traversal.getSideEffects().get(key)));
- }
- this.broadcast(new BarrierDoneMessage(barrier));
- barrier.done();
- }
- }
- this.barriers.clear();
- this.master.send(this.leaderWorker, Terminate.MAYBE);
- } else {
- while (this.traversal.hasNext()) {
- this.results.add((Traverser.Admin) this.traversal.nextTraverser());
- }
- if (this.orderCounter != -1)
- this.results.sort((a, b) -> Integer.compare(((OrderedTraverser<?>) a).order(), ((OrderedTraverser<?>) b).order()));
-
- this.master.close();
- }
- } else {
- throw new IllegalStateException("Unknown message:" + message);
- }
- }
-
- @Override
- public void terminate() {
- this.master.result().setResult(this.results);
- }
-
- private void broadcast(final Object message) {
- for (final Address.Worker worker : this.master.workers()) {
- this.master.send(worker, message);
- }
- }
-
- private void processTraverser(final Traverser.Admin traverser) {
- if (traverser.isHalted() || traverser.get() instanceof Element) {
- this.sendTraverser(traverser);
- } else {
- final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
- step.addStart(traverser);
- if (step instanceof Barrier) {
- this.barriers.put(step.getId(), (Barrier) step);
- } else {
- while (step.hasNext()) {
- this.processTraverser(step.next());
- }
- }
- }
- }
-
- private void sendTraverser(final Traverser.Admin traverser) {
- if (traverser.isHalted())
- this.results.add(traverser);
- else if (traverser.get() instanceof Element)
- this.master.send(this.partitionToWorkerMap.get(this.master.partitioner().getPartition((Element) traverser.get())), traverser);
- else
- this.master.send(this.master.address(), traverser);
- }
-
- private void orderBarrier(final Step step) {
- if (this.orderCounter != -1 && step instanceof Barrier && (step instanceof RangeGlobalStep || step instanceof TailGlobalStep)) {
- final Barrier barrier = (Barrier) step;
- final TraverserSet<?> rangingBarrier = (TraverserSet<?>) barrier.nextBarrier();
- rangingBarrier.sort((a, b) -> Integer.compare(((OrderedTraverser<?>) a).order(), ((OrderedTraverser<?>) b).order()));
- barrier.addBarrier(rangingBarrier);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
deleted file mode 100644
index 001219a..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalWorkerProgram.java
+++ /dev/null
@@ -1,170 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor.traversal;
-
-import org.apache.tinkerpop.gremlin.process.actor.Actor;
-import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
-import org.apache.tinkerpop.gremlin.process.actor.Address;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierAddMessage;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.BarrierDoneMessage;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectSetMessage;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.StartMessage;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.Terminate;
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
-import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
-import org.apache.tinkerpop.gremlin.structure.Edge;
-import org.apache.tinkerpop.gremlin.structure.Element;
-import org.apache.tinkerpop.gremlin.structure.Partition;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-final class TraversalWorkerProgram<M> implements ActorProgram.Worker<M> {
-
- private final Actor.Worker self;
- private final TraversalMatrix<?, ?> matrix;
- private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>();
- //
- private Address.Worker neighborWorker;
- private boolean isLeader;
- private Terminate terminate = null;
- private boolean voteToHalt = false;
- private Map<String, Barrier> barriers = new HashMap<>();
-
- public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal) {
- this.self = self;
- // System.out.println("worker[created]: " + this.self.address().getId());
- // set up partition and traversal information
- final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self);
- TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal);
- this.matrix = new TraversalMatrix<>(traversal);
- Distributing.configure(traversal, false, true);
- Pushing.configure(traversal, true, false);
- //////
- final GraphStep graphStep = (GraphStep) traversal.getStartStep();
- if (0 == graphStep.getIds().length)
- ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.self.partition()::vertices : this.self.partition()::edges);
- else {
- if (graphStep.returnsVertex())
- ((GraphStep<Vertex, Vertex>) traversal.getStartStep()).setIteratorSupplier(
- () -> IteratorUtils.filter(self.partition().vertices(graphStep.getIds()), this.self.partition()::contains));
- else
- ((GraphStep<Edge, Edge>) traversal.getStartStep()).setIteratorSupplier(
- () -> IteratorUtils.filter(self.partition().edges(graphStep.getIds()), this.self.partition()::contains));
- }
- }
-
- @Override
- public void setup() {
- // create termination ring topology
- final int i = this.self.workers().indexOf(this.self.address());
- this.neighborWorker = this.self.workers().get(i == this.self.workers().size() - 1 ? 0 : i + 1);
- this.isLeader = i == 0;
- for (int j = 0; j < this.self.partitioner().getPartitions().size(); j++) {
- this.partitionToWorkerMap.put(this.self.partitioner().getPartitions().get(j), this.self.workers().get(j));
- }
- }
-
- @Override
- public void execute(final M message) {
- //System.out.println(message + "::" + this.isLeader);
- if (message instanceof StartMessage) {
- // initial message from master that says: "start processing"
- final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep();
- while (step.hasNext()) {
- this.sendTraverser(step.next());
- }
- } else if (message instanceof Traverser.Admin) {
- this.processTraverser((Traverser.Admin) message);
- } else if (message instanceof SideEffectSetMessage) {
- this.matrix.getTraversal().getSideEffects().set(((SideEffectSetMessage) message).getKey(), ((SideEffectSetMessage) message).getValue());
- } else if (message instanceof BarrierDoneMessage) {
- final Step<?, ?> step = (Step) this.matrix.getStepById(((BarrierDoneMessage) message).getStepId());
- while (step.hasNext()) {
- sendTraverser(step.next());
- }
- } else if (message instanceof Terminate) {
- assert null == this.terminate;
- this.terminate = (Terminate) message;
- if (!this.barriers.isEmpty()) {
- for (final Barrier barrier : this.barriers.values()) {
- while (barrier.hasNextBarrier()) {
- this.self.send(this.self.master(), new BarrierAddMessage(barrier));
- }
- }
- this.barriers.clear();
- }
- // use termination token to determine termination condition
- if (this.isLeader) {
- if (this.voteToHalt && Terminate.YES == this.terminate)
- this.self.send(this.self.master(), Terminate.YES);
- else
- this.self.send(this.neighborWorker, Terminate.YES);
- } else
- this.self.send(this.neighborWorker, this.voteToHalt ? this.terminate : Terminate.NO);
- this.terminate = null;
- this.voteToHalt = true;
- } else {
- throw new IllegalArgumentException("The following message is unknown: " + message);
- }
- }
-
- @Override
- public void terminate() {
-
- }
-
- //////////////
-
- private void processTraverser(final Traverser.Admin traverser) {
- assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.self.partition().contains((Element) traverser.get());
- final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
- step.addStart(traverser);
- if (step instanceof Barrier) {
- this.barriers.put(step.getId(), (Barrier) step);
- } else {
- while (step.hasNext()) {
- this.sendTraverser(step.next());
- }
- }
- }
-
- private void sendTraverser(final Traverser.Admin traverser) {
- this.voteToHalt = false;
- if (traverser.isHalted())
- this.self.send(this.self.master(), traverser);
- else if (traverser.get() instanceof Element && !this.self.partition().contains((Element) traverser.get()))
- this.self.send(this.partitionToWorkerMap.get(this.self.partitioner().getPartition((Element) traverser.get())), traverser);
- else
- this.self.send(this.self.address(), traverser);
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
deleted file mode 100644
index 0950435..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/WorkerTraversalSideEffects.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor.traversal;
-
-import org.apache.tinkerpop.gremlin.process.actor.Actor;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.message.SideEffectAddMessage;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
-
-import java.util.Optional;
-import java.util.Set;
-import java.util.function.BinaryOperator;
-import java.util.function.Supplier;
-import java.util.function.UnaryOperator;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class WorkerTraversalSideEffects implements TraversalSideEffects {
-
- private TraversalSideEffects sideEffects;
- private Actor.Worker worker;
-
-
- private WorkerTraversalSideEffects() {
- // for serialization
- }
-
- public WorkerTraversalSideEffects(final TraversalSideEffects sideEffects, final Actor.Worker worker) {
- this.sideEffects = sideEffects;
- this.worker = worker;
- }
-
- public TraversalSideEffects getSideEffects() {
- return this.sideEffects;
- }
-
- @Override
- public void set(final String key, final Object value) {
- this.sideEffects.set(key, value);
- }
-
- @Override
- public <V> V get(final String key) throws IllegalArgumentException {
- return this.sideEffects.get(key);
- }
-
- @Override
- public void remove(final String key) {
- this.sideEffects.remove(key);
- }
-
- @Override
- public Set<String> keys() {
- return this.sideEffects.keys();
- }
-
- @Override
- public void add(final String key, final Object value) {
- this.sideEffects.add(key, value);
- this.worker.send(this.worker.master(), new SideEffectAddMessage(key, value));
- }
-
- @Override
- public <V> void register(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) {
- this.sideEffects.register(key, initialValue, reducer);
- }
-
- @Override
- public <V> void registerIfAbsent(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) {
- this.sideEffects.registerIfAbsent(key, initialValue, reducer);
- }
-
- @Override
- public <V> BinaryOperator<V> getReducer(final String key) {
- return this.sideEffects.getReducer(key);
- }
-
- @Override
- public <V> Supplier<V> getSupplier(final String key) {
- return this.sideEffects.getSupplier(key);
- }
-
- @Override
- @Deprecated
- public void registerSupplier(final String key, final Supplier supplier) {
- this.sideEffects.registerSupplier(key, supplier);
- }
-
- @Override
- @Deprecated
- public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key) {
- return this.sideEffects.getRegisteredSupplier(key);
- }
-
- @Override
- public <S> void setSack(final Supplier<S> initialValue, final UnaryOperator<S> splitOperator, final BinaryOperator<S> mergeOperator) {
- this.sideEffects.setSack(initialValue, splitOperator, mergeOperator);
- }
-
- @Override
- public <S> Supplier<S> getSackInitialValue() {
- return this.sideEffects.getSackInitialValue();
- }
-
- @Override
- public <S> UnaryOperator<S> getSackSplitter() {
- return this.sideEffects.getSackSplitter();
- }
-
- @Override
- public <S> BinaryOperator<S> getSackMerger() {
- return this.sideEffects.getSackMerger();
- }
-
- @Override
- public TraversalSideEffects clone() {
- try {
- final WorkerTraversalSideEffects clone = (WorkerTraversalSideEffects) super.clone();
- clone.sideEffects = this.sideEffects.clone();
- return clone;
- } catch (final CloneNotSupportedException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
-
- @Override
- public void mergeInto(final TraversalSideEffects sideEffects) {
- this.sideEffects.mergeInto(sideEffects);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java
deleted file mode 100644
index dba9b86..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierAddMessage.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor.traversal.message;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class BarrierAddMessage {
-
- private final Object barrier;
- private final String stepId;
-
- public BarrierAddMessage(final Barrier barrier) {
- this.barrier = barrier.nextBarrier();
- this.stepId = ((Step) barrier).getId();
- }
-
- public Object getBarrier() {
- return this.barrier;
- }
-
- public String getStepId() {
- return this.stepId;
- }
-
-
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierDoneMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierDoneMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierDoneMessage.java
deleted file mode 100644
index 837a55f..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/BarrierDoneMessage.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor.traversal.message;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class BarrierDoneMessage {
-
- private final String stepId;
-
- public BarrierDoneMessage(final Barrier barrier) {
- this.stepId = ((Step) barrier).getId();
-
- }
-
- public String getStepId() {
- return this.stepId;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/SideEffectAddMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/SideEffectAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/SideEffectAddMessage.java
deleted file mode 100644
index 511c125..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/SideEffectAddMessage.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor.traversal.message;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SideEffectAddMessage {
-
- private final String key;
- private final Object value;
-
- public SideEffectAddMessage(final String key, final Object value) {
- this.value = value;
- this.key = key;
- }
-
- public String getKey() {
- return this.key;
- }
-
- public Object getValue() {
- return this.value;
- }
-}
-
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/SideEffectSetMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/SideEffectSetMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/SideEffectSetMessage.java
deleted file mode 100644
index 31f83c2..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/SideEffectSetMessage.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor.traversal.message;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class SideEffectSetMessage {
-
- private final String key;
- private final Object value;
-
- public SideEffectSetMessage(final String key, final Object value) {
- this.key = key;
- this.value = value;
- }
-
- public String getKey() {
- return this.key;
- }
-
- public Object getValue() {
- return this.value;
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/StartMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/StartMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/StartMessage.java
deleted file mode 100644
index 1b4292e..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/StartMessage.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor.traversal.message;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class StartMessage {
-
- private static final StartMessage INSTANCE = new StartMessage();
-
- private StartMessage() {
- }
-
- public static StartMessage instance() {
- return INSTANCE;
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java
deleted file mode 100644
index 4ab789d..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/message/Terminate.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor.traversal.message;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public enum Terminate {
-
- MAYBE, YES, NO
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java
deleted file mode 100644
index e4520aa..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/step/map/TraversalActorProgramStep.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor.traversal.step.map;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.process.actor.ActorProgram;
-import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.TraversalActorProgram;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-
-import java.util.NoSuchElementException;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class TraversalActorProgramStep<S, E> extends AbstractStep<E, E> {
-
-
- private final Traversal.Admin<S, E> actorsTraversal;
- private final Configuration graphActorsConfiguration;
- private boolean first = true;
-
- public TraversalActorProgramStep(final Traversal.Admin<?, ?> traversal, final Configuration graphActorsConfiguration) {
- super(traversal);
- this.graphActorsConfiguration = graphActorsConfiguration;
- this.actorsTraversal = (Traversal.Admin) traversal.clone();
- this.actorsTraversal.setParent(EmptyStep.instance());
- }
-
- @Override
- protected Traverser.Admin<E> processNextStart() throws NoSuchElementException {
- if (this.first) {
- this.first = false;
- try {
- final GraphActors<TraverserSet<E>> graphActors = GraphActors.open(this.graphActorsConfiguration);
- final ActorProgram<TraverserSet<E>> actorProgram = new TraversalActorProgram<>(this.actorsTraversal);
- graphActors.program(actorProgram).submit(this.getTraversal().getGraph().get()).get().forEach(this.starts::add);
- } catch (final Exception e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- }
- return this.starts.next();
- }
-
- @Override
- public String toString() {
- return StringFactory.stepString(this, this.actorsTraversal);
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java
deleted file mode 100644
index 7e713de..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/decoration/ActorProgramStrategy.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.decoration;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
-import org.apache.tinkerpop.gremlin.process.actor.traversal.step.map.TraversalActorProgramStep;
-import org.apache.tinkerpop.gremlin.process.remote.traversal.strategy.decoration.RemoteStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.ProcessorTraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-
-import java.util.Collections;
-import java.util.Set;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ActorProgramStrategy extends AbstractTraversalStrategy<TraversalStrategy.DecorationStrategy>
- implements TraversalStrategy.DecorationStrategy, ProcessorTraversalStrategy<GraphActors> {
-
- private static final Set<Class<? extends DecorationStrategy>> PRIORS = Collections.singleton(RemoteStrategy.class);
-
- private final Configuration graphActorsConfiguration;
-
- public ActorProgramStrategy(final GraphActors graphActors) {
- this.graphActorsConfiguration = graphActors.configuration();
- }
-
- @Override
- public void apply(final Traversal.Admin<?, ?> traversal) {
- ReadOnlyStrategy.instance().apply(traversal);
-
- if (!(traversal.getParent() instanceof EmptyStep))
- return;
-
- final TraversalActorProgramStep<?, ?> actorStep = new TraversalActorProgramStep<>(traversal, this.graphActorsConfiguration);
- TraversalHelper.removeAllSteps(traversal);
- traversal.addStep(actorStep);
-
- // validations
- assert traversal.getStartStep().equals(actorStep);
- assert traversal.getSteps().size() == 1;
- assert traversal.getEndStep() == actorStep;
- }
-
- @Override
- public Set<Class<? extends DecorationStrategy>> applyPrior() {
- return PRIORS;
- }
-
- ////////////////////////////////////////////////////////////
-
- @Override
- public Configuration getConfiguration() {
- return this.graphActorsConfiguration;
- }
-
- public static ActorProgramStrategy create(final Configuration configuration) {
- try {
- return new ActorProgramStrategy(GraphActors.open(configuration));
- } catch (final Exception e) {
- throw new IllegalArgumentException(e.getMessage(), e);
- }
- }
-
- @Override
- public GraphActors getProcessor() {
- return GraphActors.open(this.graphActorsConfiguration);
- }
-
-}
-
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java
deleted file mode 100644
index f6e93ef..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/strategy/verification/ActorVerificationStrategy.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor.traversal.strategy.verification;
-
-import org.apache.tinkerpop.gremlin.process.traversal.Step;
-import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
-import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
-import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
-import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
-import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
-import org.apache.tinkerpop.gremlin.structure.Graph;
-
-import java.util.Collections;
-import java.util.Set;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class ActorVerificationStrategy extends AbstractTraversalStrategy<TraversalStrategy.VerificationStrategy> implements TraversalStrategy.VerificationStrategy {
-
- private static final ActorVerificationStrategy INSTANCE = new ActorVerificationStrategy();
-
- private ActorVerificationStrategy() {
- }
-
- @Override
- public void apply(final Traversal.Admin<?, ?> traversal) {
- if (!TraversalHelper.getStepsOfAssignableClass(InjectStep.class, traversal).isEmpty())
- throw new VerificationException("Inject traversal currently not supported", traversal);
- }
-
- public static ActorVerificationStrategy instance() {
- return INSTANCE;
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/DefaultActorsResult.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/DefaultActorsResult.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/DefaultActorsResult.java
deleted file mode 100644
index c650ba1..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/DefaultActorsResult.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor.util;
-
-import org.apache.tinkerpop.gremlin.process.actor.ActorsResult;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class DefaultActorsResult<R> implements ActorsResult<R> {
-
- private R result;
-
- public DefaultActorsResult() {
-
- }
-
- public R getResult() {
- return this.result;
- }
-
- public void setResult(final R result) {
- this.result = result;
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/GraphActorsHelper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/GraphActorsHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/GraphActorsHelper.java
deleted file mode 100644
index eebee17..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/util/GraphActorsHelper.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.tinkerpop.gremlin.process.actor.util;
-
-import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-
-import java.util.Iterator;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class GraphActorsHelper {
-
- private GraphActorsHelper() {
-
- }
-
- public static GraphActors configure(GraphActors actors, final Configuration configuration) {
- final Iterator<String> keys = IteratorUtils.asList(configuration.getKeys()).iterator();
- while (keys.hasNext()) {
- final String key = keys.next();
- if (key.equals(GraphActors.GRAPH_ACTORS_WORKERS))
- actors = actors.workers(configuration.getInt(GraphActors.GRAPH_ACTORS_WORKERS));
- else if (!key.equals(GraphActors.GRAPH_ACTORS))
- actors = actors.configure(key, configuration.getProperty(key));
- }
- return actors;
- }
-}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
new file mode 100644
index 0000000..0445968
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Actor.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors;
+
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Partitioner;
+
+import java.util.List;
+
+/**
+ * An Actor represents an isolated processing unit that can only be interacted with via messages.
+ * Actors are able to send and receive messages. The {@link GraphActors} framework has two types of actors:
+ * {@link Master} and {@link Worker}. A master actors is not associated with a particular graph {@link Partition}.
+ * Instead, its role is to coordinate the workers and ultimately, yield the final result of the submitted
+ * {@link ActorProgram}.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface Actor {
+
+ /**
+ * Get the {@link Partitioner} associated with the {@link GraphActors} system.
+ *
+ * @return the partitioner used to partition (logically and/or physically) the {@link org.apache.tinkerpop.gremlin.structure.Graph}
+ */
+ public Partitioner partitioner();
+
+ /**
+ * Get the {@link Address} of the actors.
+ *
+ * @return the actors's address
+ */
+ public Address address();
+
+ /**
+ * Get a list of the {@link Address} values of all the workers in {@link GraphActors} system.
+ *
+ * @return the worker's addresses
+ */
+ public List<Address.Worker> workers();
+
+ /**
+ * Send a message from this actors to another actors given their {@link Address}.
+ *
+ * @param toActor the actors to receive the messages
+ * @param message the message being sent
+ * @param <M> the message type
+ */
+ public <M> void send(final Address toActor, final M message);
+
+ public interface Master extends Actor {
+
+ public Address.Master address();
+
+ public void close();
+
+ public <R> ActorsResult<R> result();
+
+ }
+
+ public interface Worker extends Actor {
+
+ public Address.Worker address();
+
+ public Address.Master master();
+
+ /**
+ * Get the {@link Partition} associated with this worker.
+ * In principle, this is the subset of the {@link org.apache.tinkerpop.gremlin.structure.Graph} that
+ * the worker is "data-local" to.
+ *
+ * @return the worker's partition
+ */
+ public Partition partition();
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
new file mode 100644
index 0000000..b1e3065
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorProgram.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface ActorProgram extends Cloneable {
+
+ public static final String ACTOR_PROGRAM = "gremlin.actorProgram";
+
+ /**
+ * When it is necessary to store the state of the ActorProgram, this method is called.
+ * This is typically required when the ActorProgram needs to be serialized to another machine.
+ * Note that what is stored is simply the instance/configuration state, not any processed data.
+ * The default implementation provided simply stores the ActorProgarm class name for reflective reconstruction.
+ * It is typically a good idea to ActorProgram.super.storeState().
+ *
+ * @param configuration the configuration to store the state of the ActorProgram in.
+ */
+ public default void storeState(final Configuration configuration) {
+ configuration.setProperty(ACTOR_PROGRAM, this.getClass().getCanonicalName());
+ }
+
+ /**
+ * When it is necessary to load the state of the ActorProgram, this method is called.
+ * This is typically required when the ActorProgram needs to be serialized to another machine.
+ * Note that what is loaded is simply the instance state, not any processed data.
+ *
+ * @param graph the graph that the ActorProgram will run against
+ * @param configuration the configuration to load the state of the ActorProgram from.
+ */
+ public default void loadState(final Graph graph, final Configuration configuration) {
+
+ }
+
+ /**
+ * Create the {@link org.apache.tinkerpop.gremlin.process.actors.Actor.Worker} program.
+ * This is typically used by {@link Worker} to spawn its program.
+ *
+ * @param worker the worker actors creating the worker program
+ * @return the worker program
+ */
+ public ActorProgram.Worker createWorkerProgram(final Actor.Worker worker);
+
+ /**
+ * Create the {@link org.apache.tinkerpop.gremlin.process.actors.Actor.Master} program.
+ * This is typically used by {@link Master} to spawn its program.
+ *
+ * @param master the master actors creating the master program
+ * @return the master program
+ */
+ public ActorProgram.Master createMasterProgram(final Actor.Master master);
+
+ /**
+ * Get the ordered list of message classes where order determines the priority
+ * of message reception by the respective {@link Actor}. For instance,
+ * if an {@link Actor} has a message of type {@code X} and a message of type {@code Y}
+ * in its message buffer, and {@code X} has a higher priority, it will be fetched
+ * first from the buffer. If no list is provided then its FIFO.
+ * The default implementation returns an {@link Optional#empty()}.
+ *
+ * @return the optional ordered list of message priorities.
+ */
+ public default Optional<List<Class>> getMessagePriorities() {
+ return Optional.empty();
+ }
+
+ /**
+ * When multiple workers on a single machine need ActorProgram instances, it is possible to use clone.
+ * This will provide a speedier way of generating instances, over the {@link ActorProgram#storeState} and {@link ActorProgram#loadState} model.
+ * The default implementation simply returns the object as it assumes that the ActorProgram instance is a stateless singleton.
+ *
+ * @return A clone of the VertexProgram object
+ */
+ @SuppressWarnings("CloneDoesntDeclareCloneNotSupportedException")
+ public ActorProgram clone();
+
+ /**
+ * The Worker program is executed by a worker process in the {@link GraphActors} system.
+ * There are many workers and a single master.
+ * All workers execute the same program.
+ *
+ * @param <M> The message type accepted by the worker
+ */
+ public static interface Worker<M> {
+
+ /**
+ * This method is evaluated when the worker process is spawned.
+ */
+ public void setup();
+
+ /**
+ * This method is evaluated when the worker receives a new message.
+ *
+ * @param message the received message
+ */
+ public void execute(final M message);
+
+ /**
+ * This method is evaluated when the worker process is destroyed.
+ */
+ public void terminate();
+
+ }
+
+ /**
+ * The Master program is executed by the master process in the {@link GraphActors} system.
+ * There are many workers and a single master.
+ *
+ * @param <M> The message type accepted by the master
+ */
+ public static interface Master<M> {
+ public void setup();
+
+ public void execute(final M message);
+
+ public void terminate();
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorsResult.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorsResult.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorsResult.java
new file mode 100644
index 0000000..beb7ab9
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/ActorsResult.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface ActorsResult<R> {
+
+ public R getResult();
+
+ public void setResult(final R result);
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/a3eb72a4/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Address.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Address.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Address.java
new file mode 100644
index 0000000..894a961
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/Address.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors;
+
+import java.io.Serializable;
+import java.net.InetAddress;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public abstract class Address implements Serializable {
+
+ private final String id;
+ private final InetAddress location;
+
+ public Address(final String id, final InetAddress location) {
+ this.id = id;
+ this.location = location;
+ }
+
+ public InetAddress getLocation() {
+ return this.location;
+ }
+
+ public String getId() {
+ return this.id;
+ }
+
+ @Override
+ public boolean equals(final Object other) {
+ return other instanceof Address && ((Address) other).id.equals(this.id);
+ }
+
+ @Override
+ public int hashCode() {
+ return this.id.hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return this.id;
+ }
+
+ public static final class Master extends Address {
+
+ public Master(final String id, final InetAddress location) {
+ super(id, location);
+ }
+
+ }
+
+ public static final class Worker extends Address {
+
+ public Worker(final String id, final InetAddress location) {
+ super(id, location);
+ }
+
+ }
+}