You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/11 17:53:49 UTC
[41/50] [abbrv] tinkerpop git commit: refactored GraphActors
packaging -- its not actor/, but actors/. JavaDoc and various cleanups. Also,
about to NOT serialize a traversal but instead use Bytecode. Next push will do
this with TraveraslVertexProgram.
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActors.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActors.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActors.java
new file mode 100644
index 0000000..9b8fe38
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActors.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors;
+
+import org.apache.commons.configuration.BaseConfiguration;
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.Processor;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.decoration.ActorProgramStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.ProcessorTraversalStrategy;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+
+import java.util.concurrent.Future;
+
+/**
+ * GraphActors is a message-passing based graph {@link Processor} that is:
+ * asynchronous, distributed, and partition-centric.
+ *
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public interface GraphActors<R> extends Processor {
+
+ public static final String GRAPH_ACTORS = "gremlin.graphActors";
+ public static final String GRAPH_ACTORS_WORKERS = "gremlin.graphActors.workers";
+
+ /**
+ * Provide the {@link ActorProgram} that the GraphActors will execute.
+ *
+ * @param program the program to execute
+ * @return the updated GraphActors with newly defined program
+ */
+ public GraphActors<R> program(final ActorProgram program);
+
+ /**
+ * Specify the number of workers per {@link Graph} {@link org.apache.tinkerpop.gremlin.structure.Partition}.
+ *
+ * @param workers the number of workers per partition
+ * @return the updated GraphActors with newly defined workers
+ */
+ public GraphActors<R> workers(final int workers);
+
+ /**
+ * Add an arbitrary configuration to the GraphActors system.
+ * Typically, these configurations are provider-specific and do not generalize across all GraphActor implementations.
+ *
+ * @param key the key of the configuration
+ * @param value the value of the configuration
+ * @return the updated GraphActors with newly defined configuration
+ */
+ public GraphActors<R> configure(final String key, final Object value);
+
+ /**
+ * Execute the {@link ActorProgram} on the {@link GraphActors} system against the specified {@link Graph}.
+ *
+ * @return a {@link Future} denoting a reference to the asynchronous computation's result
+ */
+ @Override
+ public Future<R> submit(final Graph graph);
+
+ /**
+ * Returns an {@link ActorProgramStrategy} which enables a {@link Traversal} to execute on {@link GraphActors}.
+ *
+ * @return a {@link org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy} capable of executing traversals on a GraphActors system
+ */
+ @Override
+ public default ProcessorTraversalStrategy<GraphActors> getProcessorTraversalStrategy() {
+ return new ActorProgramStrategy(this);
+ }
+
+ /**
+ * Create an arbitrary GraphActors system given the information contained in the provided {@link Configuration}.
+ *
+ * @param configuration the {@link Configuration} containing, at minimum, {@link GraphActors#GRAPH_ACTORS} system class name
+ * @param <A> the particular type of GraphActors
+ * @return a constructed GraphActors system
+ */
+ public static <A extends GraphActors> A open(final Configuration configuration) {
+ try {
+ return (A) Class.forName(configuration.getString(GRAPH_ACTORS)).getMethod("open", Configuration.class).invoke(null, configuration);
+ } catch (final Exception e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ }
+
+ public static <A extends GraphActors> A open(final Class<A> graphActorsClass) {
+ final BaseConfiguration configuration = new BaseConfiguration();
+ configuration.setProperty(GRAPH_ACTORS, graphActorsClass.getCanonicalName());
+ return GraphActors.open(configuration);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
new file mode 100644
index 0000000..484b904
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalActorProgram.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.jsr223.JavaTranslator;
+import org.apache.tinkerpop.gremlin.process.actors.Actor;
+import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectSetMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.StartMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.Terminate;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.decoration.ActorProgramStrategy;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.verification.ActorVerificationStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.Bytecode;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategies;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.InlineFilterStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.LazyBarrierStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.MatchPredicateStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.PathRetractionStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.optimization.RepeatUnrollStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TraversalActorProgram<R> implements ActorProgram {
+
+ public static final String TRAVERSAL_ACTOR_PROGRAM_BYTECODE = "gremlin.traversalActorProgram.bytecode";
+
+ private static final List<Class> MESSAGE_PRIORITIES = Arrays.asList(
+ StartMessage.class,
+ Traverser.class,
+ SideEffectAddMessage.class,
+ BarrierAddMessage.class,
+ SideEffectSetMessage.class,
+ BarrierDoneMessage.class,
+ Terminate.class);
+
+ private Traversal.Admin<?, R> traversal;
+ public TraverserSet<R> result = new TraverserSet<>();
+
+ public TraversalActorProgram(final Traversal.Admin<?, R> traversal) {
+ this.traversal = traversal;
+ final TraversalStrategies strategies = this.traversal.getStrategies().clone();
+ strategies.addStrategies(ActorVerificationStrategy.instance(), ReadOnlyStrategy.instance());
+ // TODO: make TinkerGraph/etc. strategies smart about actors
+ new ArrayList<>(strategies.toList()).stream().
+ filter(s -> s instanceof TraversalStrategy.ProviderOptimizationStrategy).
+ map(TraversalStrategy::getClass).
+ forEach(strategies::removeStrategies);
+ strategies.removeStrategies(
+ ActorProgramStrategy.class,
+ LazyBarrierStrategy.class,
+ RepeatUnrollStrategy.class,
+ MatchPredicateStrategy.class,
+ InlineFilterStrategy.class,
+ PathRetractionStrategy.class);
+ this.traversal.setStrategies(strategies);
+ this.traversal.applyStrategies();
+ }
+
+ @Override
+ public void storeState(final Configuration configuration) {
+ configuration.setProperty(ACTOR_PROGRAM, TraversalActorProgram.class.getCanonicalName());
+ configuration.setProperty(TRAVERSAL_ACTOR_PROGRAM_BYTECODE, this.traversal.getBytecode());
+ }
+
+ @Override
+ public void loadState(final Graph graph, final Configuration configuration) {
+ final Bytecode bytecode = (Bytecode) configuration.getProperty(TRAVERSAL_ACTOR_PROGRAM_BYTECODE);
+ this.traversal = (Traversal.Admin<?, R>) JavaTranslator.of(graph.traversal()).translate(bytecode);
+ }
+
+ @Override
+ public TraversalActorProgram.Worker createWorkerProgram(final Actor.Worker worker) {
+ return new TraversalWorkerProgram(worker, this.traversal.clone());
+ }
+
+ @Override
+ public TraversalActorProgram.Master createMasterProgram(final Actor.Master master) {
+ return new TraversalMasterProgram(master, this.traversal.clone(), this.result);
+ }
+
+ @Override
+ public Optional<List<Class>> getMessagePriorities() {
+ return Optional.of(MESSAGE_PRIORITIES);
+ }
+
+ @Override
+ public TraversalActorProgram<R> clone() {
+ try {
+ final TraversalActorProgram<R> clone = (TraversalActorProgram<R>) super.clone();
+ clone.traversal = this.traversal.clone();
+ return clone;
+ } catch (final CloneNotSupportedException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
new file mode 100644
index 0000000..e447cdb
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalMasterProgram.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal;
+
+import org.apache.tinkerpop.gremlin.process.actors.Actor;
+import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actors.Address;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectSetMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.StartMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.Terminate;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.TailGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.OrderedTraverser;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+final class TraversalMasterProgram implements ActorProgram.Master<Object> {
+
+ private final Actor.Master master;
+ private final Traversal.Admin<?, ?> traversal;
+ private final TraversalMatrix<?, ?> matrix;
+ private Map<String, Barrier> barriers = new HashMap<>();
+ private final TraverserSet<?> results;
+ private Address.Worker leaderWorker;
+ private int orderCounter = -1;
+ private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>();
+
+ public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final TraverserSet<?> results) {
+ this.traversal = traversal;
+ // System.out.println("master[created]: " + master.address().getId());
+ // System.out.println(this.traversal);
+ this.matrix = new TraversalMatrix<>(this.traversal);
+ this.results = results;
+ this.master = master;
+ Distributing.configure(this.traversal, true, true);
+ Pushing.configure(this.traversal, true, false);
+ }
+
+ @Override
+ public void setup() {
+ this.leaderWorker = this.master.workers().get(0);
+ for (int i = 0; i < this.master.partitioner().getPartitions().size(); i++) {
+ this.partitionToWorkerMap.put(this.master.partitioner().getPartitions().get(i), this.master.workers().get(i));
+ }
+ this.broadcast(StartMessage.instance());
+ this.master.send(this.leaderWorker, Terminate.MAYBE);
+ }
+
+ @Override
+ public void execute(final Object message) {
+ if (message instanceof Traverser.Admin) {
+ this.processTraverser((Traverser.Admin) message);
+ } else if (message instanceof BarrierAddMessage) {
+ final Barrier barrier = (Barrier) this.matrix.getStepById(((BarrierAddMessage) message).getStepId());
+ final Step<?, ?> step = (Step) barrier;
+ barrier.addBarrier(((BarrierAddMessage) message).getBarrier());
+ this.barriers.put(step.getId(), barrier);
+ } else if (message instanceof SideEffectAddMessage) {
+ this.traversal.getSideEffects().add(((SideEffectAddMessage) message).getKey(), ((SideEffectAddMessage) message).getValue());
+ } else if (message instanceof Terminate) {
+ assert Terminate.YES == message;
+ if (!this.barriers.isEmpty()) {
+ for (final Barrier barrier : this.barriers.values()) {
+ final Step<?, ?> step = (Step) barrier;
+ if (!(barrier instanceof LocalBarrier)) {
+ this.orderBarrier(step);
+ if (step instanceof OrderGlobalStep) this.orderCounter = 0;
+ while (step.hasNext()) {
+ this.sendTraverser(-1 == this.orderCounter ?
+ step.next() :
+ new OrderedTraverser<>(step.next(), this.orderCounter++));
+ }
+ } else {
+ if (step instanceof SideEffectCapable) {
+ final String key = ((SideEffectCapable) step).getSideEffectKey();
+ this.broadcast(new SideEffectSetMessage(key, this.traversal.getSideEffects().get(key)));
+ }
+ this.broadcast(new BarrierDoneMessage(barrier));
+ barrier.done();
+ }
+ }
+ this.barriers.clear();
+ this.master.send(this.leaderWorker, Terminate.MAYBE);
+ } else {
+ while (this.traversal.hasNext()) {
+ this.results.add((Traverser.Admin) this.traversal.nextTraverser());
+ }
+ if (this.orderCounter != -1)
+ this.results.sort((a, b) -> Integer.compare(((OrderedTraverser<?>) a).order(), ((OrderedTraverser<?>) b).order()));
+
+ this.master.close();
+ }
+ } else {
+ throw new IllegalStateException("Unknown message:" + message);
+ }
+ }
+
+ @Override
+ public void terminate() {
+ this.master.result().setResult(this.results);
+ }
+
+ private void broadcast(final Object message) {
+ for (final Address.Worker worker : this.master.workers()) {
+ this.master.send(worker, message);
+ }
+ }
+
+ private void processTraverser(final Traverser.Admin traverser) {
+ if (traverser.isHalted() || traverser.get() instanceof Element) {
+ this.sendTraverser(traverser);
+ } else {
+ final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
+ step.addStart(traverser);
+ if (step instanceof Barrier) {
+ this.barriers.put(step.getId(), (Barrier) step);
+ } else {
+ while (step.hasNext()) {
+ this.processTraverser(step.next());
+ }
+ }
+ }
+ }
+
+ private void sendTraverser(final Traverser.Admin traverser) {
+ if (traverser.isHalted())
+ this.results.add(traverser);
+ else if (traverser.get() instanceof Element)
+ this.master.send(this.partitionToWorkerMap.get(this.master.partitioner().getPartition((Element) traverser.get())), traverser);
+ else
+ this.master.send(this.master.address(), traverser);
+ }
+
+ private void orderBarrier(final Step step) {
+ if (this.orderCounter != -1 && step instanceof Barrier && (step instanceof RangeGlobalStep || step instanceof TailGlobalStep)) {
+ final Barrier barrier = (Barrier) step;
+ final TraverserSet<?> rangingBarrier = (TraverserSet<?>) barrier.nextBarrier();
+ rangingBarrier.sort((a, b) -> Integer.compare(((OrderedTraverser<?>) a).order(), ((OrderedTraverser<?>) b).order()));
+ barrier.addBarrier(rangingBarrier);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
new file mode 100644
index 0000000..2aaa7b5
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/TraversalWorkerProgram.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal;
+
+import org.apache.tinkerpop.gremlin.process.actors.Actor;
+import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actors.Address;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierAddMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.BarrierDoneMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectSetMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.StartMessage;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.Terminate;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Distributing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Pushing;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Partition;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+final class TraversalWorkerProgram implements ActorProgram.Worker<Object> {
+
+ private final Actor.Worker self;
+ private final TraversalMatrix<?, ?> matrix;
+ private final Map<Partition, Address.Worker> partitionToWorkerMap = new HashMap<>();
+ //
+ private Address.Worker neighborWorker;
+ private boolean isLeader;
+ private Terminate terminate = null;
+ private boolean voteToHalt = false;
+ private Map<String, Barrier> barriers = new HashMap<>();
+
+ public TraversalWorkerProgram(final Actor.Worker self, final Traversal.Admin<?, ?> traversal) {
+ this.self = self;
+ // System.out.println("worker[created]: " + this.self.address().getId());
+ // set up partition and traversal information
+ final WorkerTraversalSideEffects sideEffects = new WorkerTraversalSideEffects(traversal.getSideEffects(), this.self);
+ TraversalHelper.applyTraversalRecursively(t -> t.setSideEffects(sideEffects), traversal);
+ this.matrix = new TraversalMatrix<>(traversal);
+ Distributing.configure(traversal, false, true);
+ Pushing.configure(traversal, true, false);
+ //////
+ final GraphStep graphStep = (GraphStep) traversal.getStartStep();
+ if (0 == graphStep.getIds().length)
+ ((GraphStep) traversal.getStartStep()).setIteratorSupplier(graphStep.returnsVertex() ? this.self.partition()::vertices : this.self.partition()::edges);
+ else {
+ if (graphStep.returnsVertex())
+ ((GraphStep<Vertex, Vertex>) traversal.getStartStep()).setIteratorSupplier(
+ () -> IteratorUtils.filter(self.partition().vertices(graphStep.getIds()), this.self.partition()::contains));
+ else
+ ((GraphStep<Edge, Edge>) traversal.getStartStep()).setIteratorSupplier(
+ () -> IteratorUtils.filter(self.partition().edges(graphStep.getIds()), this.self.partition()::contains));
+ }
+ }
+
+ @Override
+ public void setup() {
+ // create termination ring topology
+ final int i = this.self.workers().indexOf(this.self.address());
+ this.neighborWorker = this.self.workers().get(i == this.self.workers().size() - 1 ? 0 : i + 1);
+ this.isLeader = i == 0;
+ for (int j = 0; j < this.self.partitioner().getPartitions().size(); j++) {
+ this.partitionToWorkerMap.put(this.self.partitioner().getPartitions().get(j), this.self.workers().get(j));
+ }
+ }
+
+ @Override
+ public void execute(final Object message) {
+ //System.out.println(message + "::" + this.isLeader);
+ if (message instanceof StartMessage) {
+ // initial message from master that says: "start processing"
+ final GraphStep step = (GraphStep) this.matrix.getTraversal().getStartStep();
+ while (step.hasNext()) {
+ this.sendTraverser(step.next());
+ }
+ } else if (message instanceof Traverser.Admin) {
+ this.processTraverser((Traverser.Admin) message);
+ } else if (message instanceof SideEffectSetMessage) {
+ this.matrix.getTraversal().getSideEffects().set(((SideEffectSetMessage) message).getKey(), ((SideEffectSetMessage) message).getValue());
+ } else if (message instanceof BarrierDoneMessage) {
+ final Step<?, ?> step = (Step) this.matrix.getStepById(((BarrierDoneMessage) message).getStepId());
+ while (step.hasNext()) {
+ sendTraverser(step.next());
+ }
+ } else if (message instanceof Terminate) {
+ assert null == this.terminate;
+ this.terminate = (Terminate) message;
+ if (!this.barriers.isEmpty()) {
+ for (final Barrier barrier : this.barriers.values()) {
+ while (barrier.hasNextBarrier()) {
+ this.self.send(this.self.master(), new BarrierAddMessage(barrier));
+ }
+ }
+ this.barriers.clear();
+ }
+ // use termination token to determine termination condition
+ if (this.isLeader) {
+ if (this.voteToHalt && Terminate.YES == this.terminate)
+ this.self.send(this.self.master(), Terminate.YES);
+ else
+ this.self.send(this.neighborWorker, Terminate.YES);
+ } else
+ this.self.send(this.neighborWorker, this.voteToHalt ? this.terminate : Terminate.NO);
+ this.terminate = null;
+ this.voteToHalt = true;
+ } else {
+ throw new IllegalArgumentException("The following message is unknown: " + message);
+ }
+ }
+
+ @Override
+ public void terminate() {
+
+ }
+
+ //////////////
+
+ private void processTraverser(final Traverser.Admin traverser) {
+ assert !(traverser.get() instanceof Element) || !traverser.isHalted() || this.self.partition().contains((Element) traverser.get());
+ final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
+ step.addStart(traverser);
+ if (step instanceof Barrier) {
+ this.barriers.put(step.getId(), (Barrier) step);
+ } else {
+ while (step.hasNext()) {
+ this.sendTraverser(step.next());
+ }
+ }
+ }
+
+ private void sendTraverser(final Traverser.Admin traverser) {
+ this.voteToHalt = false;
+ if (traverser.isHalted())
+ this.self.send(this.self.master(), traverser);
+ else if (traverser.get() instanceof Element && !this.self.partition().contains((Element) traverser.get()))
+ this.self.send(this.partitionToWorkerMap.get(this.self.partitioner().getPartition((Element) traverser.get())), traverser);
+ else
+ this.self.send(this.self.address(), traverser);
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java
new file mode 100644
index 0000000..b660eda
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/WorkerTraversalSideEffects.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal;
+
+import org.apache.tinkerpop.gremlin.process.actors.Actor;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.message.SideEffectAddMessage;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BinaryOperator;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class WorkerTraversalSideEffects implements TraversalSideEffects {
+
+ private TraversalSideEffects sideEffects;
+ private Actor.Worker worker;
+
+
+ private WorkerTraversalSideEffects() {
+ // for serialization
+ }
+
+ public WorkerTraversalSideEffects(final TraversalSideEffects sideEffects, final Actor.Worker worker) {
+ this.sideEffects = sideEffects;
+ this.worker = worker;
+ }
+
+ public TraversalSideEffects getSideEffects() {
+ return this.sideEffects;
+ }
+
+ @Override
+ public void set(final String key, final Object value) {
+ this.sideEffects.set(key, value);
+ }
+
+ @Override
+ public <V> V get(final String key) throws IllegalArgumentException {
+ return this.sideEffects.get(key);
+ }
+
+ @Override
+ public void remove(final String key) {
+ this.sideEffects.remove(key);
+ }
+
+ @Override
+ public Set<String> keys() {
+ return this.sideEffects.keys();
+ }
+
+ @Override
+ public void add(final String key, final Object value) {
+ this.sideEffects.add(key, value);
+ this.worker.send(this.worker.master(), new SideEffectAddMessage(key, value));
+ }
+
+ @Override
+ public <V> void register(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) {
+ this.sideEffects.register(key, initialValue, reducer);
+ }
+
+ @Override
+ public <V> void registerIfAbsent(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) {
+ this.sideEffects.registerIfAbsent(key, initialValue, reducer);
+ }
+
+ @Override
+ public <V> BinaryOperator<V> getReducer(final String key) {
+ return this.sideEffects.getReducer(key);
+ }
+
+ @Override
+ public <V> Supplier<V> getSupplier(final String key) {
+ return this.sideEffects.getSupplier(key);
+ }
+
+ @Override
+ @Deprecated
+ public void registerSupplier(final String key, final Supplier supplier) {
+ this.sideEffects.registerSupplier(key, supplier);
+ }
+
+ @Override
+ @Deprecated
+ public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key) {
+ return this.sideEffects.getRegisteredSupplier(key);
+ }
+
+ @Override
+ public <S> void setSack(final Supplier<S> initialValue, final UnaryOperator<S> splitOperator, final BinaryOperator<S> mergeOperator) {
+ this.sideEffects.setSack(initialValue, splitOperator, mergeOperator);
+ }
+
+ @Override
+ public <S> Supplier<S> getSackInitialValue() {
+ return this.sideEffects.getSackInitialValue();
+ }
+
+ @Override
+ public <S> UnaryOperator<S> getSackSplitter() {
+ return this.sideEffects.getSackSplitter();
+ }
+
+ @Override
+ public <S> BinaryOperator<S> getSackMerger() {
+ return this.sideEffects.getSackMerger();
+ }
+
+ @Override
+ public TraversalSideEffects clone() {
+ try {
+ final WorkerTraversalSideEffects clone = (WorkerTraversalSideEffects) super.clone();
+ clone.sideEffects = this.sideEffects.clone();
+ return clone;
+ } catch (final CloneNotSupportedException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void mergeInto(final TraversalSideEffects sideEffects) {
+ this.sideEffects.mergeInto(sideEffects);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
new file mode 100644
index 0000000..ac4c61d
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierAddMessage.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class BarrierAddMessage {
+
+ private final Object barrier;
+ private final String stepId;
+
+ public BarrierAddMessage(final Barrier barrier) {
+ this.barrier = barrier.nextBarrier();
+ this.stepId = ((Step) barrier).getId();
+ }
+
+ public Object getBarrier() {
+ return this.barrier;
+ }
+
+ public String getStepId() {
+ return this.stepId;
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierDoneMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierDoneMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierDoneMessage.java
new file mode 100644
index 0000000..7979c33
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/BarrierDoneMessage.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class BarrierDoneMessage {
+
+ private final String stepId;
+
+ public BarrierDoneMessage(final Barrier barrier) {
+ this.stepId = ((Step) barrier).getId();
+
+ }
+
+ public String getStepId() {
+ return this.stepId;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java
new file mode 100644
index 0000000..1c0a9de
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectAddMessage.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SideEffectAddMessage {
+
+ private final String key;
+ private final Object value;
+
+ public SideEffectAddMessage(final String key, final Object value) {
+ this.value = value;
+ this.key = key;
+ }
+
+ public String getKey() {
+ return this.key;
+ }
+
+ public Object getValue() {
+ return this.value;
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectSetMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectSetMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectSetMessage.java
new file mode 100644
index 0000000..84788f9
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/SideEffectSetMessage.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SideEffectSetMessage {
+
+ private final String key;
+ private final Object value;
+
+ public SideEffectSetMessage(final String key, final Object value) {
+ this.key = key;
+ this.value = value;
+ }
+
+ public String getKey() {
+ return this.key;
+ }
+
+ public Object getValue() {
+ return this.value;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/StartMessage.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/StartMessage.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/StartMessage.java
new file mode 100644
index 0000000..e704033
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/StartMessage.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class StartMessage {
+
+ private static final StartMessage INSTANCE = new StartMessage();
+
+ private StartMessage() {
+ }
+
+ public static StartMessage instance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java
new file mode 100644
index 0000000..5621528
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/message/Terminate.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal.message;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public enum Terminate {
+
+ MAYBE, YES, NO
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java
new file mode 100644
index 0000000..d6b8858
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/step/map/TraversalActorProgramStep.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal.step.map;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.actors.ActorProgram;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.TraversalActorProgram;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+
+import java.util.NoSuchElementException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class TraversalActorProgramStep<S, E> extends AbstractStep<E, E> {
+
+
+ private final Traversal.Admin<S, E> actorsTraversal;
+ private final Configuration graphActorsConfiguration;
+ private boolean first = true;
+
+ public TraversalActorProgramStep(final Traversal.Admin<?, ?> traversal, final Configuration graphActorsConfiguration) {
+ super(traversal);
+ this.graphActorsConfiguration = graphActorsConfiguration;
+ this.actorsTraversal = (Traversal.Admin) traversal.clone();
+ this.actorsTraversal.setParent(EmptyStep.instance());
+ }
+
+ @Override
+ protected Traverser.Admin<E> processNextStart() throws NoSuchElementException {
+ if (this.first) {
+ this.first = false;
+ try {
+ final GraphActors<TraverserSet<E>> graphActors = GraphActors.open(this.graphActorsConfiguration);
+ final ActorProgram actorProgram = new TraversalActorProgram<>(this.actorsTraversal);
+ graphActors.program(actorProgram).submit(this.getTraversal().getGraph().get()).get().forEach(this.starts::add);
+ } catch (final Exception e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+ return this.starts.next();
+ }
+
+ @Override
+ public String toString() {
+ return StringFactory.stepString(this, this.actorsTraversal);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/decoration/ActorProgramStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/decoration/ActorProgramStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/decoration/ActorProgramStrategy.java
new file mode 100644
index 0000000..6e4365e
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/decoration/ActorProgramStrategy.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.decoration;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
+import org.apache.tinkerpop.gremlin.process.actors.traversal.step.map.TraversalActorProgramStep;
+import org.apache.tinkerpop.gremlin.process.remote.traversal.strategy.decoration.RemoteStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.ProcessorTraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.ReadOnlyStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+
+import java.util.Collections;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ActorProgramStrategy extends AbstractTraversalStrategy<TraversalStrategy.DecorationStrategy>
+ implements TraversalStrategy.DecorationStrategy, ProcessorTraversalStrategy<GraphActors> {
+
+ private static final Set<Class<? extends DecorationStrategy>> PRIORS = Collections.singleton(RemoteStrategy.class);
+
+ private final Configuration graphActorsConfiguration;
+
+ public ActorProgramStrategy(final GraphActors graphActors) {
+ this.graphActorsConfiguration = graphActors.configuration();
+ }
+
+ @Override
+ public void apply(final Traversal.Admin<?, ?> traversal) {
+ ReadOnlyStrategy.instance().apply(traversal);
+
+ if (!(traversal.getParent() instanceof EmptyStep))
+ return;
+
+ final TraversalActorProgramStep<?, ?> actorStep = new TraversalActorProgramStep<>(traversal, this.graphActorsConfiguration);
+ TraversalHelper.removeAllSteps(traversal);
+ traversal.addStep(actorStep);
+
+ // validations
+ assert traversal.getStartStep().equals(actorStep);
+ assert traversal.getSteps().size() == 1;
+ assert traversal.getEndStep() == actorStep;
+ }
+
+ @Override
+ public Set<Class<? extends DecorationStrategy>> applyPrior() {
+ return PRIORS;
+ }
+
+ ////////////////////////////////////////////////////////////
+
+ @Override
+ public Configuration getConfiguration() {
+ return this.graphActorsConfiguration;
+ }
+
+ public static ActorProgramStrategy create(final Configuration configuration) {
+ try {
+ return new ActorProgramStrategy(GraphActors.open(configuration));
+ } catch (final Exception e) {
+ throw new IllegalArgumentException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public GraphActors getProcessor() {
+ return GraphActors.open(this.graphActorsConfiguration);
+ }
+
+}
+
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java
new file mode 100644
index 0000000..cdf5465
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/traversal/strategy/verification/ActorVerificationStrategy.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.traversal.strategy.verification;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
+import org.apache.tinkerpop.gremlin.process.traversal.strategy.verification.VerificationException;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class ActorVerificationStrategy extends AbstractTraversalStrategy<TraversalStrategy.VerificationStrategy> implements TraversalStrategy.VerificationStrategy {
+
+ private static final ActorVerificationStrategy INSTANCE = new ActorVerificationStrategy();
+
+ private ActorVerificationStrategy() {
+ }
+
+ @Override
+ public void apply(final Traversal.Admin<?, ?> traversal) {
+ if (!TraversalHelper.getStepsOfAssignableClass(InjectStep.class, traversal).isEmpty())
+ throw new VerificationException("Inject traversal currently not supported", traversal);
+ }
+
+ public static ActorVerificationStrategy instance() {
+ return INSTANCE;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/DefaultActorsResult.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/DefaultActorsResult.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/DefaultActorsResult.java
new file mode 100644
index 0000000..208a9a1
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/DefaultActorsResult.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.util;
+
+import org.apache.tinkerpop.gremlin.process.actors.ActorsResult;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class DefaultActorsResult<R> implements ActorsResult<R> {
+
+ private R result;
+
+ public DefaultActorsResult() {
+
+ }
+
+ public R getResult() {
+ return this.result;
+ }
+
+ public void setResult(final R result) {
+ this.result = result;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/GraphActorsHelper.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/GraphActorsHelper.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/GraphActorsHelper.java
new file mode 100644
index 0000000..2af1ecd
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actors/util/GraphActorsHelper.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.actors.util;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.Iterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GraphActorsHelper {
+
+ private GraphActorsHelper() {
+
+ }
+
+ public static GraphActors configure(GraphActors actors, final Configuration configuration) {
+ final Iterator<String> keys = IteratorUtils.asList(configuration.getKeys()).iterator();
+ while (keys.hasNext()) {
+ final String key = keys.next();
+ if (key.equals(GraphActors.GRAPH_ACTORS_WORKERS))
+ actors = actors.workers(configuration.getInt(GraphActors.GRAPH_ACTORS_WORKERS));
+ else if (!key.equals(GraphActors.GRAPH_ACTORS))
+ actors = actors.configure(key, configuration.getProperty(key));
+ }
+ return actors;
+ }
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
index 96dae61..42bd864 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/TraversalStrategies.java
@@ -18,7 +18,7 @@
*/
package org.apache.tinkerpop.gremlin.process.traversal;
-import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.traversal.strategy.optimization.GraphFilterStrategy;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.decoration.ConnectiveStrategy;
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
index f7c350d..ee7a196 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/StringFactory.java
@@ -19,7 +19,7 @@
package org.apache.tinkerpop.gremlin.structure.util;
import org.apache.tinkerpop.gremlin.process.Processor;
-import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
import org.apache.tinkerpop.gremlin.process.computer.ComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphManager.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphManager.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphManager.java
index 43b3608..482577f 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphManager.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphManager.java
@@ -19,7 +19,7 @@
package org.apache.tinkerpop.gremlin;
import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalEngine;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphProvider.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphProvider.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphProvider.java
index 9d63b3c..d0d877d 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphProvider.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/GraphProvider.java
@@ -19,7 +19,7 @@
package org.apache.tinkerpop.gremlin;
import org.apache.commons.configuration.Configuration;
-import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
+import org.apache.tinkerpop.gremlin.process.actors.GraphActors;
import org.apache.tinkerpop.gremlin.process.computer.GraphComputer;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalStrategy;
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/3ac32164/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
index ec3ece2..cc9d995 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/actors/GraphActorsTest.java
@@ -21,7 +21,6 @@ package org.apache.tinkerpop.gremlin.process.actors;
import org.apache.tinkerpop.gremlin.LoadGraphWith;
import org.apache.tinkerpop.gremlin.process.AbstractGremlinProcessTest;
-import org.apache.tinkerpop.gremlin.process.actor.GraphActors;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.junit.Test;