You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/04 13:58:51 UTC

[33/50] [abbrv] tinkerpop git commit: came up with a clever idea for maintaining order in a distributed traversal. OrdereredTraverser is a wrapper around a Traverser where all methods are delegated to the internal traverser. However, OrderedTraverser has

came up with a clever idea for maintaining order in a distributed traversal. OrdereredTraverser is a wrapper around a Traverser where all methods are delegated to the internal traverser. However, OrderedTraverser has an order() method which returns an int. This allows an ordered stream of traversers to be distributed across machines and then, on return, ordered accordingly. This is much better than the GraphComputer model we have. With OrderedTraverser, we have now exposed OrderTest, DedupTest, and TailTest to gremlin-akka. The final obstacle is nested group()s. There is something super fishy going on with cloning and I'm scared of just banging my head against the wall all morning so I will just let it simmer.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/1a66be32
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/1a66be32
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/1a66be32

Branch: refs/heads/TINKERPOP-1564
Commit: 1a66be32960b8305eb81a8f7f59b065bc3aed386
Parents: 3d4fb35
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Dec 14 05:54:03 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 4 05:07:59 2017 -0700

----------------------------------------------------------------------
 .../akka/process/actor/ActorMailbox.java        |  25 +--
 .../akka/process/AkkaActorsProvider.java        |   7 +-
 .../actor/traversal/TraversalMasterProgram.java |  33 +++-
 .../traverser/util/OrderedTraverser.java        | 178 +++++++++++++++++++
 4 files changed, 222 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1a66be32/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/ActorMailbox.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/ActorMailbox.java b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/ActorMailbox.java
index 28afb22..c8e5fde 100644
--- a/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/ActorMailbox.java
+++ b/akka-gremlin/src/main/java/org/apache/tinkerpop/gremlin/akka/process/actor/ActorMailbox.java
@@ -31,10 +31,8 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSe
 import scala.Option;
 
 import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Queue;
 
 /**
@@ -45,31 +43,34 @@ public final class ActorMailbox implements MailboxType, ProducesMessageQueue<Act
     private final List<Class> messagePriorities = new ArrayList<>();
 
     public static class ActorMessageQueue implements MessageQueue, ActorSemantics {
+        private final List<Class> messagePriorities;
         private final List<Queue> messages;
-        private final Map<Class, Queue> priorities;
         private final Object MUTEX = new Object();
 
         public ActorMessageQueue(final List<Class> messagePriorities) {
-            this.messages = new ArrayList<>(messagePriorities.size());
-            this.priorities = new HashMap<>(messagePriorities.size());
-            for (final Class clazz : messagePriorities) {
+            this.messagePriorities = messagePriorities;
+            this.messages = new ArrayList<>(this.messagePriorities.size());
+            for (final Class clazz : this.messagePriorities) {
                 final Queue queue;
                 if (Traverser.class.isAssignableFrom(clazz))
                     queue = new TraverserSet<>();
                 else
                     queue = new LinkedList<>();
                 this.messages.add(queue);
-                this.priorities.put(clazz, queue);
             }
         }
 
         public void enqueue(final ActorRef receiver, final Envelope handle) {
             synchronized (MUTEX) {
-                final Queue queue = this.priorities.get(handle.message() instanceof Traverser ? Traverser.class : handle.message().getClass());
-                if (null == queue)
-                    throw new IllegalArgumentException("The provided message type is not registered: " + handle.message().getClass());
-                else
-                    queue.offer(handle.message() instanceof Traverser ? handle.message() : handle);
+                final Object message = handle.message();
+                for (int i = 0; i < this.messagePriorities.size(); i++) {
+                    final Class clazz = this.messagePriorities.get(i);
+                    if (clazz.isInstance(message)) {
+                        this.messages.get(i).offer(message instanceof Traverser ? message : handle);
+                        return;
+                    }
+                }
+                throw new IllegalArgumentException("The provided message type is not registered: " + handle.message().getClass());
             }
         }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1a66be32/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
----------------------------------------------------------------------
diff --git a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
index a0703bd..9c8b320 100644
--- a/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
+++ b/akka-gremlin/src/test/java/org/apache/tinkerpop/gremlin/akka/process/AkkaActorsProvider.java
@@ -63,20 +63,15 @@ import java.util.Set;
  */
 public class AkkaActorsProvider extends AbstractGraphProvider {
 
-    protected static final boolean IMPORT_STATICS = new Random().nextBoolean();
-
     private static Set<String> SKIP_TESTS = new HashSet<>(Arrays.asList(
             "g_V_hasLabelXpersonX_V_hasLabelXsoftwareX_name",
             "g_VX1X_repeatXbothEXcreatedX_whereXwithoutXeXX_aggregateXeX_otherVX_emit_path",
-            "g_V_outXfollowedByX_group_byXsongTypeX_byXbothE_group_byXlabelX_byXweight_sumXX",
             "g_withBulkXfalseX_withSackX1_sumX_V_out_barrier_sack",
             "g_V_both_groupCountXaX_out_capXaX_selectXkeysX_unfold_both_groupCountXaX_capXaX",
+            "g_V_both_both_dedup_byXoutE_countX_name",
             GraphTest.Traversals.class.getCanonicalName(),
-            DedupTest.Traversals.class.getCanonicalName(),
-            OrderTest.Traversals.class.getCanonicalName(),
             GroupTest.Traversals.class.getCanonicalName(),
             ComplexTest.Traversals.class.getCanonicalName(),
-            TailTest.Traversals.class.getCanonicalName(),
             SubgraphTest.Traversals.class.getCanonicalName(),
             SideEffectTest.Traversals.class.getCanonicalName(),
             SubgraphStrategyProcessTest.class.getCanonicalName(),

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1a66be32/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
index 1c44b51..723339d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/actor/traversal/TraversalMasterProgram.java
@@ -34,6 +34,11 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
 import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
 import org.apache.tinkerpop.gremlin.process.traversal.step.LocalBarrier;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.RangeGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.filter.TailGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.map.OrderGlobalStep;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.OrderedTraverser;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.structure.Element;
@@ -47,6 +52,7 @@ import java.util.Map;
  */
 final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
 
+
     private final Actor.Master master;
     private final Traversal.Admin<?, ?> traversal;
     private final TraversalMatrix<?, ?> matrix;
@@ -54,6 +60,7 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
     private Map<String, Barrier> barriers = new HashMap<>();
     private final TraverserSet<?> results;
     private Address.Worker leaderWorker;
+    private int orderCounter = -1;
 
     public TraversalMasterProgram(final Actor.Master master, final Traversal.Admin<?, ?> traversal, final Partitioner partitioner, final TraverserSet<?> results) {
         this.traversal = traversal;
@@ -90,8 +97,12 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
                 for (final Barrier barrier : this.barriers.values()) {
                     final Step<?, ?> step = (Step) barrier;
                     if (!(barrier instanceof LocalBarrier)) {
+                        this.orderBarrier(step);
+                        if (step instanceof OrderGlobalStep) this.orderCounter = 0;
                         while (step.hasNext()) {
-                            this.sendTraverser(step.next());
+                            this.sendTraverser(-1 == this.orderCounter ?
+                                    step.next() :
+                                    new OrderedTraverser<>(step.next(), this.orderCounter++));
                         }
                     } else {
                         this.traversal.getSideEffects().forEach((k, v) -> {
@@ -107,6 +118,9 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
                 while (this.traversal.hasNext()) {
                     this.results.add((Traverser.Admin) this.traversal.nextTraverser());
                 }
+                if (this.orderCounter != -1)
+                    this.results.sort((a, b) -> Integer.compare(((OrderedTraverser<?>) a).order(), ((OrderedTraverser<?>) b).order()));
+
                 this.master.close();
             }
         } else {
@@ -132,8 +146,12 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
             final Step<?, ?> step = this.matrix.<Object, Object, Step<Object, Object>>getStepById(traverser.getStepId());
             GraphComputing.atMaster(step, true);
             step.addStart(traverser);
-            while (step.hasNext()) {
-                this.processTraverser(step.next());
+            if (step instanceof Barrier) {
+                this.barriers.put(step.getId(), (Barrier) step);
+            } else {
+                while (step.hasNext()) {
+                    this.processTraverser(step.next());
+                }
             }
         }
     }
@@ -146,4 +164,13 @@ final class TraversalMasterProgram<M> implements ActorProgram.Master<M> {
         else
             this.master.send(this.master.address(), traverser);
     }
+
+    private void orderBarrier(final Step step) {
+        if (this.orderCounter != -1 && step instanceof Barrier && (step instanceof RangeGlobalStep || step instanceof TailGlobalStep)) {
+            final Barrier barrier = (Barrier) step;
+            final TraverserSet<?> rangingBarrier = (TraverserSet<?>) barrier.nextBarrier();
+            rangingBarrier.sort((a, b) -> Integer.compare(((OrderedTraverser<?>) a).order(), ((OrderedTraverser<?>) b).order()));
+            barrier.addBarrier(rangingBarrier);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/1a66be32/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/OrderedTraverser.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/OrderedTraverser.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/OrderedTraverser.java
new file mode 100644
index 0000000..3be67a2
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/util/OrderedTraverser.java
@@ -0,0 +1,178 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.tinkerpop.gremlin.process.traversal.traverser.util;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Path;
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+
+import java.util.Set;
+import java.util.function.Function;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class OrderedTraverser<T> implements Traverser.Admin<T> {
+
+    private Traverser.Admin<T> internal;
+    private final int order;
+
+    public OrderedTraverser(final Traverser.Admin<T> internal, final int order) {
+        this.internal = internal instanceof OrderedTraverser ? ((OrderedTraverser) internal).internal : internal;
+        this.order = order;
+    }
+
+    public int order() {
+        return this.order;
+    }
+
+    @Override
+    public void merge(final Admin<?> other) {
+        this.internal.merge(other);
+    }
+
+    @Override
+    public <R> Admin<R> split(R r, Step<T, R> step) {
+        return new OrderedTraverser<>(this.internal.split(r, step), this.order);
+    }
+
+    @Override
+    public Admin<T> split() {
+        return new OrderedTraverser<>(this.internal.split(), this.order);
+    }
+
+    @Override
+    public void addLabels(final Set<String> labels) {
+        this.internal.addLabels(labels);
+    }
+
+    @Override
+    public void keepLabels(final Set<String> labels) {
+        this.internal.keepLabels(labels);
+    }
+
+    @Override
+    public void dropLabels(final Set<String> labels) {
+        this.internal.dropLabels(labels);
+    }
+
+    @Override
+    public void dropPath() {
+        this.internal.dropPath();
+    }
+
+    @Override
+    public void set(final T t) {
+        this.internal.set(t);
+    }
+
+    @Override
+    public void incrLoops(final String stepLabel) {
+        this.internal.incrLoops(stepLabel);
+    }
+
+    @Override
+    public void resetLoops() {
+        this.internal.resetLoops();
+    }
+
+    @Override
+    public String getStepId() {
+        return this.internal.getStepId();
+    }
+
+    @Override
+    public void setStepId(final String stepId) {
+        this.internal.setStepId(stepId);
+    }
+
+    @Override
+    public void setBulk(final long count) {
+        this.internal.setBulk(count);
+    }
+
+    @Override
+    public Admin<T> detach() {
+        return this.internal.detach();
+    }
+
+    @Override
+    public T attach(final Function<Attachable<T>, T> method) {
+        return this.internal.attach(method);
+    }
+
+    @Override
+    public void setSideEffects(final TraversalSideEffects sideEffects) {
+        this.internal.setSideEffects(sideEffects);
+    }
+
+    @Override
+    public TraversalSideEffects getSideEffects() {
+        return this.internal.getSideEffects();
+    }
+
+    @Override
+    public Set<String> getTags() {
+        return this.internal.getTags();
+    }
+
+    @Override
+    public T get() {
+        return this.internal.get();
+    }
+
+    @Override
+    public <S> S sack() {
+        return this.internal.sack();
+    }
+
+    @Override
+    public <S> void sack(final S object) {
+        this.internal.sack(object);
+    }
+
+    @Override
+    public Path path() {
+        return this.internal.path();
+    }
+
+    @Override
+    public int loops() {
+        return this.internal.loops();
+    }
+
+    @Override
+    public long bulk() {
+        return this.internal.bulk();
+    }
+
+    @Override
+    public Traverser<T> clone() {
+        try {
+            final OrderedTraverser<T> clone = (OrderedTraverser<T>) super.clone();
+            clone.internal = (Traverser.Admin<T>) this.internal.clone();
+            return clone;
+        } catch (final CloneNotSupportedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
+        }
+    }
+}