You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/12/07 16:55:19 UTC

tinkerpop git commit: I now have a general purpose aggregator pattern for Akka. I don't know if this is acceptable practice, but it works. I have implemented side-effects, but its not working properly yet. I think I might make a SideEffectActor as a way

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1564 dbadfb935 -> cb1073884


I now have a general purpose aggregator pattern for Akka. I don't know if this is acceptable practice, but it works. I have implemented side-effects, but its not working properly yet. I think I might make a SideEffectActor as a way of having a global side-effect blackboard. Committing what I have to save the work thus far.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/cb107388
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/cb107388
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/cb107388

Branch: refs/heads/TINKERPOP-1564
Commit: cb10738845da6d506a000d081e5d27320bf5e2e0
Parents: dbadfb9
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Dec 7 09:55:16 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Dec 7 09:55:16 2016 -0700

----------------------------------------------------------------------
 .../akka/DistributedTraversalSideEffects.java   | 148 +++++++++++++++++++
 .../process/akka/MasterTraversalActor.java      |  43 +++---
 .../process/akka/TinkerActorSystem.java         |   2 +-
 .../process/akka/WorkerTraversalActor.java      |  13 +-
 .../messages/HaltSynchronizationMessage.java    |  10 ++
 .../akka/messages/SideEffectMessage.java        |  52 +++++++
 6 files changed, 245 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/cb107388/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
new file mode 100644
index 0000000..5be17c6
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/DistributedTraversalSideEffects.java
@@ -0,0 +1,148 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka;
+
+import akka.actor.ActorContext;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
+import org.apache.tinkerpop.gremlin.process.computer.ProgramPhase;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMessage;
+
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BinaryOperator;
+import java.util.function.Supplier;
+import java.util.function.UnaryOperator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class DistributedTraversalSideEffects implements TraversalSideEffects {
+
+    private TraversalSideEffects sideEffects;
+    private ActorContext worker;
+
+    private DistributedTraversalSideEffects() {
+        // for serialization
+    }
+
+    public DistributedTraversalSideEffects(final TraversalSideEffects sideEffects, final ActorContext worker) {
+        this.sideEffects = sideEffects;
+        this.worker = worker;
+    }
+
+    public TraversalSideEffects getSideEffects() {
+        return this.sideEffects;
+    }
+
+    @Override
+    public void set(final String key, final Object value) {
+        this.sideEffects.set(key, value);
+    }
+
+    @Override
+    public <V> V get(final String key) throws IllegalArgumentException {
+        return this.sideEffects.get(key);
+    }
+
+    @Override
+    public void remove(final String key) {
+        this.sideEffects.remove(key);
+    }
+
+    @Override
+    public Set<String> keys() {
+        return this.sideEffects.keys();
+    }
+
+    @Override
+    public void add(final String key, final Object value) {
+        this.worker.self().tell(new SideEffectMessage(key, value), this.worker.parent());
+    }
+
+    @Override
+    public <V> void register(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) {
+        this.sideEffects.register(key, initialValue, reducer);
+    }
+
+    @Override
+    public <V> void registerIfAbsent(final String key, final Supplier<V> initialValue, final BinaryOperator<V> reducer) {
+        this.sideEffects.registerIfAbsent(key, initialValue, reducer);
+    }
+
+    @Override
+    public <V> BinaryOperator<V> getReducer(final String key) {
+        return this.sideEffects.getReducer(key);
+    }
+
+    @Override
+    public <V> Supplier<V> getSupplier(final String key) {
+        return this.sideEffects.getSupplier(key);
+    }
+
+    @Override
+    @Deprecated
+    public void registerSupplier(final String key, final Supplier supplier) {
+        this.sideEffects.registerSupplier(key, supplier);
+    }
+
+    @Override
+    @Deprecated
+    public <V> Optional<Supplier<V>> getRegisteredSupplier(final String key) {
+        return this.sideEffects.getRegisteredSupplier(key);
+    }
+
+    @Override
+    public <S> void setSack(final Supplier<S> initialValue, final UnaryOperator<S> splitOperator, final BinaryOperator<S> mergeOperator) {
+        this.sideEffects.setSack(initialValue, splitOperator, mergeOperator);
+    }
+
+    @Override
+    public <S> Supplier<S> getSackInitialValue() {
+        return this.sideEffects.getSackInitialValue();
+    }
+
+    @Override
+    public <S> UnaryOperator<S> getSackSplitter() {
+        return this.sideEffects.getSackSplitter();
+    }
+
+    @Override
+    public <S> BinaryOperator<S> getSackMerger() {
+        return this.sideEffects.getSackMerger();
+    }
+
+    @Override
+    public TraversalSideEffects clone() {
+        try {
+            final DistributedTraversalSideEffects clone = (DistributedTraversalSideEffects) super.clone();
+            clone.sideEffects = this.sideEffects.clone();
+            return clone;
+        } catch (final CloneNotSupportedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+
+    @Override
+    public void mergeInto(final TraversalSideEffects sideEffects) {
+        this.sideEffects.mergeInto(sideEffects);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/cb107388/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
index 18e8fb3..769c77c 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/MasterTraversalActor.java
@@ -37,6 +37,7 @@ import org.apache.tinkerpop.gremlin.structure.Partitioner;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierMessage;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierSynchronizationMessage;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.HaltSynchronizationMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMessage;
 
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -48,12 +49,11 @@ import java.util.Map;
  */
 public final class MasterTraversalActor extends AbstractActor implements RequiresMessageQueue<TraverserMailbox.TraverserSetSemantics> {
 
-    private Traversal.Admin<?, ?> traversal;
-    private TraversalMatrix<?, ?> matrix;
+    private final Traversal.Admin<?, ?> traversal;
+    private final TraversalMatrix<?, ?> matrix;
     private final Partitioner partitioner;
     private List<ActorPath> workers;
-
-    private Map<String, Integer> responseCounter = new HashMap<>();
+    private final Map<String, Integer> synchronizationCounters = new HashMap<>();
 
     public MasterTraversalActor(final Traversal.Admin<?, ?> traversal, final Partitioner partitioner) {
         System.out.println("master[created]: " + self().path());
@@ -63,12 +63,6 @@ public final class MasterTraversalActor extends AbstractActor implements Require
         this.partitioner = partitioner;
         this.initializeWorkers();
 
-        /*context().system().scheduler().schedule(
-                Duration.create(1, TimeUnit.NANOSECONDS),
-                Duration.create(1, TimeUnit.NANOSECONDS),
-                () -> self().tell(System.currentTimeMillis(), ActorRef.noSender()),
-                context().system().dispatcher());*/
-
         receive(ReceiveBuilder.
                 match(Traverser.Admin.class, traverser -> {
                     this.processTraverser(traverser);
@@ -79,9 +73,9 @@ public final class MasterTraversalActor extends AbstractActor implements Require
                     broadcast(new BarrierSynchronizationMessage(barrierStep));
                 }).
                 match(BarrierSynchronizationMessage.class, barrierSync -> {
-                    final Integer counter = this.responseCounter.get(barrierSync.getStepId());
+                    final Integer counter = this.synchronizationCounters.get(barrierSync.getStepId());
                     final int newCounter = null == counter ? 1 : counter + 1;
-                    this.responseCounter.put(barrierSync.getStepId(), newCounter);
+                    this.synchronizationCounters.put(barrierSync.getStepId(), newCounter);
                     if (newCounter == this.workers.size()) {
                         final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(barrierSync.getStepId());
                         while (step.hasNext()) {
@@ -89,12 +83,21 @@ public final class MasterTraversalActor extends AbstractActor implements Require
                         }
                     }
                 }).
+                match(SideEffectMessage.class, sideEffect -> {
+                    sideEffect.addSideEffect(this.traversal);
+                    this.broadcast(new SideEffectMessage(sideEffect.getKey(), sideEffect.getValue()));
+                }).
                 match(HaltSynchronizationMessage.class, haltSync -> {
-                    final Integer counter = this.responseCounter.get(Traverser.Admin.HALT);
-                    final int newCounter = null == counter ? 1 : counter + 1;
-                    this.responseCounter.put(Traverser.Admin.HALT, newCounter);
-                    if (newCounter == this.workers.size()) {
-                        context().system().terminate();
+                    if (haltSync.isHalt()) {
+                        final Integer counter = this.synchronizationCounters.get(Traverser.Admin.HALT);
+                        final int newCounter = null == counter ? 1 : counter + 1;
+                        this.synchronizationCounters.put(Traverser.Admin.HALT, newCounter);
+                        if (newCounter == this.workers.size()) {
+                            context().system().terminate();
+                        }
+                    } else {
+                        this.synchronizationCounters.remove(Traverser.Admin.HALT);
+                        this.broadcast(new HaltSynchronizationMessage(true));
                     }
                 }).build());
     }
@@ -113,18 +116,18 @@ public final class MasterTraversalActor extends AbstractActor implements Require
 
     private void broadcast(final Object message) {
         for (final ActorPath worker : this.workers) {
-            context().actorSelection(worker).tell(message,self());
+            context().actorSelection(worker).tell(message, self());
         }
     }
 
     private void processTraverser(final Traverser.Admin traverser) {
         if (traverser.isHalted()) {
             System.out.println("master[result]: " + traverser);
-            broadcast(new HaltSynchronizationMessage());
+            broadcast(new HaltSynchronizationMessage(true));
         } else {
             if (traverser.get() instanceof Element) {
                 final Partition otherPartition = this.partitioner.getPartition((Element) traverser.get());
-                context().actorSelection("worker-" + otherPartition.hashCode()).tell(traverser,self());
+                context().actorSelection("worker-" + otherPartition.hashCode()).tell(traverser, self());
             } else {
                 final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
                 step.addStart(traverser);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/cb107388/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
index ac21bcd..6fc370b 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/TinkerActorSystem.java
@@ -53,7 +53,7 @@ public final class TinkerActorSystem {
                 as("a").out().as("b"),
                 as("b").in().as("c"),
                 as("b").has("name", P.eq("lop"))).where("a", P.neq("c")).select("a", "b", "c").by("name").asAdmin();*/
-        final Traversal.Admin<?, ?> traversal = graph.traversal().withComputer().V().repeat(both()).times(2).groupCount().select(Column.keys).unfold().valueMap().count().asAdmin();
+        final Traversal.Admin<?, ?> traversal = graph.traversal().withComputer().V().repeat(both()).times(2).groupCount().by("name").asAdmin();
         new TinkerActorSystem(traversal);
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/cb107388/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
index 5dad18a..fa0a14d 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/WorkerTraversalActor.java
@@ -35,6 +35,7 @@ import org.apache.tinkerpop.gremlin.structure.Partitioner;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierMessage;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.BarrierSynchronizationMessage;
 import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.HaltSynchronizationMessage;
+import org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages.SideEffectMessage;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -45,11 +46,13 @@ public final class WorkerTraversalActor extends AbstractActor implements
     private final TraversalMatrix<?, ?> matrix;
     private final Partition partition;
     private final Partitioner partitioner;
+    private boolean sentHaltMessage = false;
 
 
     public WorkerTraversalActor(final Traversal.Admin<?, ?> traversal, final Partition partition, final Partitioner partitioner) {
         System.out.println("worker[created]: " + self().path());
         this.matrix = new TraversalMatrix<>(traversal);
+        this.matrix.getTraversal().setSideEffects(new DistributedTraversalSideEffects(this.matrix.getTraversal().getSideEffects(), context()));
         this.partition = partition;
         this.partitioner = partitioner;
         ((GraphStep) traversal.getStartStep()).setIteratorSupplier(partition::vertices);
@@ -67,12 +70,18 @@ public final class WorkerTraversalActor extends AbstractActor implements
                     while (step.hasNextBarrier()) {
                         sender().tell(new BarrierMessage(step), self());
                     }
-                    sender().tell(new BarrierSynchronizationMessage(step),self());
+                    sender().tell(new BarrierSynchronizationMessage(step), self());
+                }).
+                match(SideEffectMessage.class, sideEffect -> {
+                    sideEffect.setSideEffect(this.matrix.getTraversal());
                 }).
                 match(HaltSynchronizationMessage.class, haltSync -> {
-                    sender().tell(new HaltSynchronizationMessage(), self());
+                    sender().tell(new HaltSynchronizationMessage(true), self());
                 }).
                 match(Traverser.Admin.class, traverser -> {
+                    if (this.sentHaltMessage) {
+                        context().parent().tell(new HaltSynchronizationMessage(false), self());
+                    }
                     this.processTraverser(traverser);
                 }).build()
         );

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/cb107388/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/HaltSynchronizationMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/HaltSynchronizationMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/HaltSynchronizationMessage.java
index b77c129..f3419a0 100644
--- a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/HaltSynchronizationMessage.java
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/HaltSynchronizationMessage.java
@@ -23,4 +23,14 @@ package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 public final class HaltSynchronizationMessage implements SynchronizationMessage {
+
+    private final boolean halt;
+
+    public HaltSynchronizationMessage(final boolean halt) {
+        this.halt = halt;
+    }
+
+    public boolean isHalt() {
+        return this.halt;
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/cb107388/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
new file mode 100644
index 0000000..e9721cc
--- /dev/null
+++ b/tinkergraph-gremlin/src/test/java/org/apache/tinkerpop/gremlin/tinkergraph/process/akka/messages/SideEffectMessage.java
@@ -0,0 +1,52 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.tinkergraph.process.akka.messages;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SideEffectMessage {
+
+    private final String sideEffectKey;
+    private final Object sideEffect;
+
+    public SideEffectMessage(final String sideEffectKey, final Object sideEffect) {
+        this.sideEffect = sideEffect;
+        this.sideEffectKey = sideEffectKey;
+    }
+
+    public void setSideEffect(final Traversal.Admin<?, ?> traversal) {
+        traversal.getSideEffects().set(this.sideEffectKey, this.sideEffect);
+    }
+
+    public void addSideEffect(final Traversal.Admin<?, ?> traversal) {
+        traversal.getSideEffects().add(this.sideEffectKey, this.sideEffect);
+    }
+
+    public String getKey() {
+        return this.sideEffectKey;
+    }
+
+    public Object getValue() {
+        return this.sideEffect;
+    }
+}