You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/19 17:23:26 UTC

[03/13] tinkerpop git commit: Removed a PathProcessor.ID constraint from ComputerVerificationStrategy. Moreover, sampling and ordering is more efficient as the projected data is co-located with the traverser in the new ProjectedTraverser wrapper. Going t

Removed a PathProcessor.ID constraint from ComputerVerificationStrategy. Moreover, sampling and ordering is more efficient as the projected data is co-located with the traverser in the new ProjectedTraverser wrapper. Going to leave it at this for tp32/... Moving forward, we can make it so we don't need to DetachFactory.detach(true) for CollectingBarrierStep by maintaining 'future data.' Its complicated and I don't want to introduce potential bugs.


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b2f0c57d
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b2f0c57d
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b2f0c57d

Branch: refs/heads/master
Commit: b2f0c57df6fd9191904213622ae718a0790d7a03
Parents: 5045f67
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 18 11:07:32 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 18 11:07:32 2017 -0700

----------------------------------------------------------------------
 .../traversal/step/filter/SampleGlobalStep.java | 19 ++++++++++++--
 .../traversal/step/map/OrderGlobalStep.java     | 27 +++++---------------
 .../step/util/CollectingBarrierStep.java        | 24 ++++++++++-------
 .../ComputerVerificationStrategy.java           |  8 ------
 .../traversal/traverser/ProjectedTraverser.java | 16 +++++++-----
 .../gremlin/util/function/MultiComparator.java  | 14 +++++++---
 6 files changed, 60 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2f0c57d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java
index 0a4da58..2b2cf20 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java
@@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.lambda.ConstantTraversal;
 import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.ProjectedTraverser;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
@@ -64,6 +65,15 @@ public final class SampleGlobalStep<S> extends CollectingBarrierStep<S> implemen
     }
 
     @Override
+    public void processAllStarts() {
+        if (this.starts.hasNext()) {
+            while (this.starts.hasNext()) {
+                this.traverserSet.add(this.createProjectedTraverser(this.starts.next()));
+            }
+        }
+    }
+
+    @Override
     public void barrierConsumer(final TraverserSet<S> traverserSet) {
         // return the entire traverser set if the set is smaller than the amount to sample
         if (traverserSet.bulkSize() <= this.amountToSample)
@@ -71,7 +81,7 @@ public final class SampleGlobalStep<S> extends CollectingBarrierStep<S> implemen
         //////////////// else sample the set
         double totalWeight = 0.0d;
         for (final Traverser.Admin<S> s : traverserSet) {
-            totalWeight = totalWeight + TraversalUtil.apply(s, this.probabilityTraversal).doubleValue() * s.bulk();
+            totalWeight = totalWeight + (((ProjectedTraverser<S, Number>) s).getProjections().get(0).doubleValue() * s.bulk());
         }
         ///////
         final TraverserSet<S> sampledSet = new TraverserSet<>();
@@ -82,7 +92,7 @@ public final class SampleGlobalStep<S> extends CollectingBarrierStep<S> implemen
             for (final Traverser.Admin<S> s : traverserSet) {
                 long sampleBulk = sampledSet.contains(s) ? sampledSet.get(s).bulk() : 0;
                 if (sampleBulk < s.bulk()) {
-                    final double currentWeight = TraversalUtil.apply(s, this.probabilityTraversal).doubleValue();
+                    final double currentWeight = ((ProjectedTraverser<S, Number>) s).getProjections().get(0).doubleValue();
                     for (int i = 0; i < (s.bulk() - sampleBulk); i++) {
                         runningWeight = runningWeight + currentWeight;
                         if (RANDOM.nextDouble() <= ((runningWeight / totalWeight))) {
@@ -104,6 +114,11 @@ public final class SampleGlobalStep<S> extends CollectingBarrierStep<S> implemen
         traverserSet.addAll(sampledSet);
     }
 
+
+    private final ProjectedTraverser<S, Number> createProjectedTraverser(final Traverser.Admin<S> traverser) {
+        return new ProjectedTraverser<>(traverser, Collections.singletonList(TraversalUtil.apply(traverser, this.probabilityTraversal)));
+    }
+
     @Override
     public Set<TraverserRequirement> getRequirements() {
         return this.getSelfAndChildRequirements(TraverserRequirement.BULK);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2f0c57d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
index 9c071f1..55d8650 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
@@ -73,22 +73,11 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa
             this.multiComparator = this.createMultiComparator();
         if (this.starts.hasNext()) {
             while (this.starts.hasNext()) {
-                this.traverserSet.add(this.createOrderedTraverser(this.starts.next()));
+                this.traverserSet.add(this.createProjectedTraverser(this.starts.next()));
             }
-            this.barrierConsumer(this.traverserSet);
         }
     }
 
-    @Override
-    public Traverser.Admin<S> processNextStart() {
-        if (!this.traverserSet.isEmpty()) {
-            return this.traverserSet.remove();
-        } else if (this.starts.hasNext()) {
-            this.processAllStarts();
-        }
-        return ((ProjectedTraverser) this.traverserSet.remove()).getInternal();
-    }
-
     public void setLimit(final long limit) {
         this.limit = limit;
     }
@@ -162,18 +151,18 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa
     public MemoryComputeKey<TraverserSet<S>> getMemoryComputeKey() {
         if (null == this.multiComparator)
             this.multiComparator = this.createMultiComparator();
-        return MemoryComputeKey.of(this.getId(), new OrderBiOperator<>(this.limit, this.isShuffle, this.multiComparator), false, true);
+        return MemoryComputeKey.of(this.getId(), new OrderBiOperator<>(this.limit, this.multiComparator), false, true);
     }
 
-    private ProjectedTraverser<S> createOrderedTraverser(final Traverser.Admin<S> traverser) {
+    private final ProjectedTraverser<S,Object> createProjectedTraverser(final Traverser.Admin<S> traverser) {
         final List<Object> projections = new ArrayList<>(this.comparators.size());
         for (final Pair<Traversal.Admin<S, C>, Comparator<C>> pair : this.comparators) {
             projections.add(TraversalUtil.apply(traverser, pair.getValue0()));
         }
-        return new ProjectedTraverser<S>(traverser, projections);
+        return new ProjectedTraverser<>(traverser, projections);
     }
 
-    private MultiComparator<C> createMultiComparator() {
+    private final MultiComparator<C> createMultiComparator() {
         final List<Comparator<C>> list = new ArrayList<>(this.comparators.size());
         for (final Pair<Traversal.Admin<S, C>, Comparator<C>> pair : this.comparators) {
             list.add(pair.getValue1());
@@ -186,16 +175,14 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa
     public static final class OrderBiOperator<S> implements BinaryOperator<TraverserSet<S>>, Serializable {
 
         private long limit;
-        private boolean isShuffle;
         private MultiComparator comparator;
 
         private OrderBiOperator() {
             // for serializers that need a no-arg constructor
         }
 
-        public OrderBiOperator(final long limit, final boolean isShuffle, final MultiComparator multiComparator) {
+        public OrderBiOperator(final long limit, final MultiComparator multiComparator) {
             this.limit = limit;
-            this.isShuffle = isShuffle;
             this.comparator = multiComparator;
         }
 
@@ -203,7 +190,7 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa
         public TraverserSet<S> apply(final TraverserSet<S> setA, final TraverserSet<S> setB) {
             setA.addAll(setB);
             if (Long.MAX_VALUE != this.limit && setA.bulkSize() > this.limit) {
-                if (this.isShuffle)
+                if (this.comparator.isShuffle())
                     setA.shuffle();
                 else
                     setA.sort(this.comparator);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2f0c57d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java
index b0cce80..f99201d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java
@@ -23,11 +23,13 @@ import org.apache.tinkerpop.gremlin.process.traversal.Operator;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.ProjectedTraverser;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.util.Collections;
 import java.util.NoSuchElementException;
@@ -40,7 +42,8 @@ import java.util.function.BinaryOperator;
 public abstract class CollectingBarrierStep<S> extends AbstractStep<S, S> implements Barrier<TraverserSet<S>> {
 
     protected TraverserSet<S> traverserSet = new TraverserSet<>();
-    protected int maxBarrierSize;
+    private int maxBarrierSize;
+    private boolean barrierConsumed = false;
 
     public CollectingBarrierStep(final Traversal.Admin traversal) {
         this(traversal, Integer.MAX_VALUE);
@@ -68,7 +71,6 @@ public abstract class CollectingBarrierStep<S> extends AbstractStep<S, S> implem
                     this.traverserSet.add(this.starts.next());
                 }
             }
-            this.barrierConsumer(this.traverserSet);
         }
     }
 
@@ -85,11 +87,10 @@ public abstract class CollectingBarrierStep<S> extends AbstractStep<S, S> implem
             throw FastNoSuchElementException.instance();
         else {
             final TraverserSet<S> temp = new TraverserSet<>();
-            this.traverserSet.iterator().forEachRemaining(t -> {
+            IteratorUtils.removeOnNext(this.traverserSet.iterator()).forEachRemaining(t -> {
                 DetachedFactory.detach(t, true); // this should be dynamic
                 temp.add(t);
             });
-            this.traverserSet.clear();
             return temp;
         }
     }
@@ -98,23 +99,28 @@ public abstract class CollectingBarrierStep<S> extends AbstractStep<S, S> implem
     public void addBarrier(final TraverserSet<S> barrier) {
         this.traverserSet = barrier;
         this.traverserSet.forEach(traverser -> traverser.setSideEffects(this.getTraversal().getSideEffects()));
-        this.barrierConsumer(this.traverserSet);
+        this.barrierConsumed = false;
     }
 
     @Override
     public Traverser.Admin<S> processNextStart() {
-        if (!this.traverserSet.isEmpty()) {
-            return this.traverserSet.remove();
-        } else if (this.starts.hasNext()) {
+        if (this.traverserSet.isEmpty() && this.starts.hasNext()) {
             this.processAllStarts();
+            this.barrierConsumed = false;
+        }
+        //
+        if (!this.barrierConsumed) {
+            this.barrierConsumer(this.traverserSet);
+            this.barrierConsumed = true;
         }
-        return this.traverserSet.remove();
+        return ProjectedTraverser.tryUnwrap(this.traverserSet.remove());
     }
 
     @Override
     public CollectingBarrierStep<S> clone() {
         final CollectingBarrierStep<S> clone = (CollectingBarrierStep<S>) super.clone();
         clone.traverserSet = new TraverserSet<>();
+        clone.barrierConsumed = false;
         return clone;
     }
 

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2f0c57d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java
index 5777adb..ef9b95c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java
@@ -28,11 +28,9 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Mutating;
 import org.apache.tinkerpop.gremlin.process.traversal.step.PathProcessor;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
-import org.apache.tinkerpop.gremlin.process.traversal.step.filter.SampleGlobalStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ProfileStep;
 import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
@@ -87,12 +85,6 @@ public final class ComputerVerificationStrategy extends AbstractTraversalStrateg
                     throw new VerificationException("Local traversals may not traverse past the local star-graph on GraphComputer: " + traversalOptional.get(), traversal);
             }
 
-            // sample step use can only operate on the element and its properties (no incidences)
-            if (step instanceof SampleGlobalStep) {
-                if (((TraversalParent) step).getLocalChildren().stream().filter(t -> !TraversalHelper.isLocalProperties(t)).findAny().isPresent())
-                    throw new VerificationException("The following barrier step can not process the incident edges of a vertex on GraphComputer: " + step, traversal);
-            }
-
             // this is a problem because sideEffect.merge() is transient on the OLAP reduction
             if (TraversalHelper.getRootTraversal(traversal).getTraverserRequirements().contains(TraverserRequirement.ONE_BULK))
                 throw new VerificationException("One bulk is currently not supported on GraphComputer: " + step, traversal);

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2f0c57d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/ProjectedTraverser.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/ProjectedTraverser.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/ProjectedTraverser.java
index 67e723a..5cecdc4 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/ProjectedTraverser.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/ProjectedTraverser.java
@@ -32,16 +32,16 @@ import java.util.function.Function;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class ProjectedTraverser<T> implements Traverser.Admin<T> {
+public final class ProjectedTraverser<T, P> implements Traverser.Admin<T> {
 
     private Traverser.Admin<T> internal;
-    private List<Object> projections;
+    private List<P> projections;
 
     private ProjectedTraverser() {
         // for serialization
     }
 
-    public ProjectedTraverser(final Traverser.Admin<T> internal, final List<Object> projections) {
+    public ProjectedTraverser(final Traverser.Admin<T> internal, final List<P> projections) {
         this.internal = internal;
         this.projections = projections;
     }
@@ -51,7 +51,7 @@ public final class ProjectedTraverser<T> implements Traverser.Admin<T> {
         return this.internal;
     }
 
-    public List<Object> getProjections() {
+    public List<P> getProjections() {
         return this.projections;
     }
 
@@ -187,13 +187,17 @@ public final class ProjectedTraverser<T> implements Traverser.Admin<T> {
     }
 
     @Override
-    public ProjectedTraverser<T> clone() {
+    public ProjectedTraverser<T, P> clone() {
         try {
-            final ProjectedTraverser<T> clone = (ProjectedTraverser<T>) super.clone();
+            final ProjectedTraverser<T, P> clone = (ProjectedTraverser<T, P>) super.clone();
             clone.internal = (Traverser.Admin<T>) this.internal.clone();
             return clone;
         } catch (final CloneNotSupportedException e) {
             throw new IllegalStateException(e.getMessage(), e);
         }
     }
+
+    public static <T> Traverser.Admin<T> tryUnwrap(final Traverser.Admin<T> traverser) {
+        return traverser instanceof ProjectedTraverser ? ((ProjectedTraverser) traverser).getInternal() : traverser;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2f0c57d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/MultiComparator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/MultiComparator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/MultiComparator.java
index 427aa3d..b7176ab 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/MultiComparator.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/MultiComparator.java
@@ -32,9 +32,11 @@ import java.util.List;
 public final class MultiComparator<C> implements Comparator<C>, Serializable {
 
     private final List<Comparator> comparators;
+    private final boolean isShuffle;
 
     public MultiComparator(final List<Comparator<C>> comparators) {
         this.comparators = (List) comparators;
+        this.isShuffle = !this.comparators.isEmpty() && Order.shuffle == this.comparators.get(this.comparators.size() - 1);
     }
 
     @Override
@@ -43,14 +45,20 @@ public final class MultiComparator<C> implements Comparator<C>, Serializable {
             return Order.incr.compare(objectA, objectB);
         } else {
             for (int i = 0; i < this.comparators.size(); i++) {
-                final int comparison = this.comparators.get(i).compare(this.getObject(objectA, i), this.getObject(objectB, i));
-                if (comparison != 0)
-                    return comparison;
+                if (Order.shuffle != this.comparators.get(i)) {
+                    final int comparison = this.comparators.get(i).compare(this.getObject(objectA, i), this.getObject(objectB, i));
+                    if (comparison != 0)
+                        return comparison;
+                }
             }
             return 0;
         }
     }
 
+    public boolean isShuffle() {
+        return this.isShuffle;
+    }
+
     private final Object getObject(final C object, final int index) {
         if (object instanceof ProjectedTraverser)
             return ((ProjectedTraverser) object).getProjections().get(index);