You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/07 16:55:19 UTC
tinkerpop git commit: I now have a general purpose aggregator pattern
for Akka. I don't know if this is acceptable practice,
but it works. I have implemented side-effects,
but its not working properly yet. I think I might make a SideEffectActor as a
way
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1564 dbadfb935 -> cb1073884
I now have a general purpose aggregator pattern for Akka. I don't know if this is acceptable practice, but it works. I have implemented side-effects, but its not working properly yet. I think I might make a SideEffectActor as a way of having a global side-effect blackboard. Committing what I have to save the work thus far.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/cb107388
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/cb107388
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/cb107388
Branch: refs/heads/TINKERPOP-1564
Commit: cb10738845da6d506a000d081e5d27320bf5e2e0
Parents: dbadfb9
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Dec 7 09:55:16 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Dec 7 09:55:16 2016 -0700
----------------------------------------------------------------------
.../akka/DistributedTraversalSideEffects.java | 148 +++++++++++++++++++
.../process/akka/MasterTraversalActor.java | 43 +++---
.../process/akka/TinkerActorSystem.java | 2 +-
.../process/akka/WorkerTraversalActor.java | 13 +-
.../messages/HaltSynchronizationMessage.java | 10 ++
.../akka/messages/SideEffectMessage.java | 52 +++++++
6 files changed, 245 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/cb107388/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
new file mode 100644
index 0000000..5be17c6
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
+
+import akka.actor.ActorContext;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.ProgramPhase;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMessage;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BinaryOperator;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class DistributedTraversalSideEffects implements TraversalSideEffects {
+
+ private TraversalSideEffects sideEffects;
+ private ActorContext worker;
+
+ private DistributedTraversalSideEffects() {
+ // for serialization
+ }
+
+ public DistributedTraversalSideEffects(final TraversalSideEffects sideEffects, final ActorContext worker) {
+ this.sideEffects = sideEffects;
+ this.worker = worker;
+ }
+
+ public TraversalSideEffects getSideEffects() {
+ return this.sideEffects;
+ }
+
+ @Override
+ public void set(final String key, final Object value) {
+ this.sideEffects.set(key, value);
+ }
+
+ @Override
+ public <V> V get(final String key) throws IllegalArgumentException {
+ return this.sideEffects.get(key);
+ }
+
+ @Override
+ public void remove(final String key) {
+ this.sideEffects.remove(key);
+ }
+
+ @Override
+ public Set<String> keys() {
+ return this.sideEffects.keys();
+ }
+
+ @Override
+ public void add(final String key, final Object value) {
+ this.worker.self().tell(new SideEffectMessage(key, value), this.worker.parent());
+ }
+
+ @Override
+ public <V> void register(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) {
+ this.sideEffects.register(key, initialValue, reducer);
+ }
+
+ @Override
+ public <V> void registerIfAbsent(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) {
+ this.sideEffects.registerIfAbsent(key, initialValue, reducer);
+ }
+
+ @Override
+ public <V> BinaryOperator<V> getReducer(final String key) {
+ return this.sideEffects.getReducer(key);
+ }
+
+ @Override
+ public <V> Supplier<V> getSupplier(final String key) {
+ return this.sideEffects.getSupplier(key);
+ }
+
+ @Override
+ @Deprecated
+ public void registerSupplier(final String key, final Supplier supplier) {
+ this.sideEffects.registerSupplier(key, supplier);
+ }
+
+ @Override
+ @Deprecated
+ public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key) {
+ return this.sideEffects.getRegisteredSupplier(key);
+ }
+
+ @Override
+ public <S> void setSack(final Supplier<S> initialValue, final UnaryOperator<S> splitOperator, final BinaryOperator<S> mergeOperator) {
+ this.sideEffects.setSack(initialValue, splitOperator, mergeOperator);
+ }
+
+ @Override
+ public <S> Supplier<S> getSackInitialValue() {
+ return this.sideEffects.getSackInitialValue();
+ }
+
+ @Override
+ public <S> UnaryOperator<S> getSackSplitter() {
+ return this.sideEffects.getSackSplitter();
+ }
+
+ @Override
+ public <S> BinaryOperator<S> getSackMerger() {
+ return this.sideEffects.getSackMerger();
+ }
+
+ @Override
+ public TraversalSideEffects clone() {
+ try {
+ final DistributedTraversalSideEffects clone = (DistributedTraversalSideEffects) super.clone();
+ clone.sideEffects = this.sideEffects.clone();
+ return clone;
+ } catch (final CloneNotSupportedException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public void mergeInto(final TraversalSideEffects sideEffects) {
+ this.sideEffects.mergeInto(sideEffects);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/cb107388/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
index 18e8fb3..769c77c 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
@@ -37,6 +37,7 @@ import org.apache.tinkerpop.gremlin.structure.Partitioner;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierMessage;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierSynchronizationMessage;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.HaltSynchronizationMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMessage;
import java.util.ArrayList;
import java.util.HashMap;
@@ -48,12 +49,11 @@ import java.util.Map;
*/
public final class MasterTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics> {
- private Traversal.Admin<?, ?> traversal;
- private TraversalMatrix<?, ?> matrix;
+ private final Traversal.Admin<?, ?> traversal;
+ private final TraversalMatrix<?, ?> matrix;
private final Partitioner partitioner;
private List<ActorPath> workers;
-
- private Map<String, Integer> responseCounter = new HashMap<>();
+ private final Map<String, Integer> synchronizationCounters = new HashMap<>();
public MasterTraversalActor(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
System.out.println("master[created]: " + self().path());
@@ -63,12 +63,6 @@ public final class MasterTraversalActor extends AbstractActor implements Require
this.partitioner = partitioner;
this.initializeWorkers();
- /*context().system().scheduler().schedule(
- Duration.create(1, TimeUnit.NANOSECONDS),
- Duration.create(1, TimeUnit.NANOSECONDS),
- () -> self().tell(System.currentTimeMillis(), ActorRef.noSender()),
- context().system().dispatcher());*/
-
receive(ReceiveBuilder.
match(Traverser.Admin.class, traverser -> {
this.processTraverser(traverser);
@@ -79,9 +73,9 @@ public final class MasterTraversalActor extends AbstractActor implements Require
broadcast(new BarrierSynchronizationMessage(barrierStep));
}).
match(BarrierSynchronizationMessage.class, barrierSync -> {
- final Integer counter = this.responseCounter.get(barrierSync.getStepId());
+ final Integer counter = this.synchronizationCounters.get(barrierSync.getStepId());
final int newCounter = null == counter ? 1 : counter + 1;
- this.responseCounter.put(barrierSync.getStepId(), newCounter);
+ this.synchronizationCounters.put(barrierSync.getStepId(), newCounter);
if (newCounter == this.workers.size()) {
final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(barrierSync.getStepId());
while (step.hasNext()) {
@@ -89,12 +83,21 @@ public final class MasterTraversalActor extends AbstractActor implements Require
}
}
}).
+ match(SideEffectMessage.class, sideEffect -> {
+ sideEffect.addSideEffect(this.traversal);
+ this.broadcast(new SideEffectMessage(sideEffect.getKey(), sideEffect.getValue()));
+ }).
match(HaltSynchronizationMessage.class, haltSync -> {
- final Integer counter = this.responseCounter.get(Traverser.Admin.HALT);
- final int newCounter = null == counter ? 1 : counter + 1;
- this.responseCounter.put(Traverser.Admin.HALT, newCounter);
- if (newCounter == this.workers.size()) {
- context().system().terminate();
+ if (haltSync.isHalt()) {
+ final Integer counter = this.synchronizationCounters.get(Traverser.Admin.HALT);
+ final int newCounter = null == counter ? 1 : counter + 1;
+ this.synchronizationCounters.put(Traverser.Admin.HALT, newCounter);
+ if (newCounter == this.workers.size()) {
+ context().system().terminate();
+ }
+ } else {
+ this.synchronizationCounters.remove(Traverser.Admin.HALT);
+ this.broadcast(new HaltSynchronizationMessage(true));
}
}).build());
}
@@ -113,18 +116,18 @@ public final class MasterTraversalActor extends AbstractActor implements Require
private void broadcast(final Object message) {
for (final ActorPath worker : this.workers) {
- context().actorSelection(worker).tell(message,self());
+ context().actorSelection(worker).tell(message, self());
}
}
private void processTraverser(final Traverser.Admin traverser) {
if (traverser.isHalted()) {
System.out.println("master[result]: " + traverser);
- broadcast(new HaltSynchronizationMessage());
+ broadcast(new HaltSynchronizationMessage(true));
} else {
if (traverser.get() instanceof Element) {
final Partition otherPartition = this.partitioner.getPartition((Element) traverser.get());
- context().actorSelection("worker-" + otherPartition.hashCode()).tell(traverser,self());
+ context().actorSelection("worker-" + otherPartition.hashCode()).tell(traverser, self());
} else {
final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
step.addStart(traverser);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/cb107388/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
index ac21bcd..6fc370b 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
@@ -53,7 +53,7 @@ public final class TinkerActorSystem {
as("a").out().as("b"),
as("b").in().as("c"),
as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin();*/
- final Traversal.Admin<?, ?> traversal = graph.traversal().withComputer().V().repeat(both()).times(2).groupCount().select(Column.keys).unfold().valueMap().count().asAdmin();
+ final Traversal.Admin<?, ?> traversal = graph.traversal().withComputer().V().repeat(both()).times(2).groupCount().by("name").asAdmin();
new TinkerActorSystem(traversal);
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/cb107388/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
index 5dad18a..fa0a14d 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -35,6 +35,7 @@ import org.apache.tinkerpop.gremlin.structure.Partitioner;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierMessage;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierSynchronizationMessage;
import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.HaltSynchronizationMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMessage;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -45,11 +46,13 @@ public final class WorkerTraversalActor extends AbstractActor implements
private final TraversalMatrix<?, ?> matrix;
private final Partition partition;
private final Partitioner partitioner;
+ private boolean sentHaltMessage = false;
public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition partition, final Partitioner partitioner) {
System.out.println("worker[created]: " + self().path());
this.matrix = new TraversalMatrix<>(traversal);
+ this.matrix.getTraversal().setSideEffects(new DistributedTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context()));
this.partition = partition;
this.partitioner = partitioner;
((GraphStep) traversal.getStartStep()).setIteratorSupplier(partition::vertices);
@@ -67,12 +70,18 @@ public final class WorkerTraversalActor extends AbstractActor implements
while (step.hasNextBarrier()) {
sender().tell(new BarrierMessage(step), self());
}
- sender().tell(new BarrierSynchronizationMessage(step),self());
+ sender().tell(new BarrierSynchronizationMessage(step), self());
+ }).
+ match(SideEffectMessage.class, sideEffect -> {
+ sideEffect.setSideEffect(this.matrix.getTraversal());
}).
match(HaltSynchronizationMessage.class, haltSync -> {
- sender().tell(new HaltSynchronizationMessage(), self());
+ sender().tell(new HaltSynchronizationMessage(true), self());
}).
match(Traverser.Admin.class, traverser -> {
+ if (this.sentHaltMessage) {
+ context().parent().tell(new HaltSynchronizationMessage(false), self());
+ }
this.processTraverser(traverser);
}).build()
);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/cb107388/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/HaltSynchronizationMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/HaltSynchronizationMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/HaltSynchronizationMessage.java
index b77c129..f3419a0 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/HaltSynchronizationMessage.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/HaltSynchronizationMessage.java
@@ -23,4 +23,14 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class HaltSynchronizationMessage implements SynchronizationMessage {
+
+ private final boolean halt;
+
+ public HaltSynchronizationMessage(final boolean halt) {
+ this.halt = halt;
+ }
+
+ public boolean isHalt() {
+ return this.halt;
+ }
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/cb107388/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
new file mode 100644
index 0000000..e9721cc
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SideEffectMessage {
+
+ private final String sideEffectKey;
+ private final Object sideEffect;
+
+ public SideEffectMessage(final String sideEffectKey, final Object sideEffect) {
+ this.sideEffect = sideEffect;
+ this.sideEffectKey = sideEffectKey;
+ }
+
+ public void setSideEffect(final Traversal.Admin<?, ?> traversal) {
+ traversal.getSideEffects().set(this.sideEffectKey, this.sideEffect);
+ }
+
+ public void addSideEffect(final Traversal.Admin<?, ?> traversal) {
+ traversal.getSideEffects().add(this.sideEffectKey, this.sideEffect);
+ }
+
+ public String getKey() {
+ return this.sideEffectKey;
+ }
+
+ public Object getValue() {
+ return this.sideEffect;
+ }
+}