You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/19 17:23:26 UTC
[03/13] tinkerpop git commit: Removed a PathProcessor.ID constraint
from ComputerVerificationStrategy. Moreover,
sampling and ordering is more efficient as the projected data is co-located
with the traverser in the new ProjectedTraverser wrapper. Going t
Removed a PathProcessor.ID constraint from ComputerVerificationStrategy. Moreover, sampling and ordering is more efficient as the projected data is co-located with the traverser in the new ProjectedTraverser wrapper. Going to leave it at this for tp32/... Moving forward, we can make it so we don't need to DetachFactory.detach(true) for CollectingBarrierStep by maintaining 'future data.' Its complicated and I don't want to introduce potential bugs.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/b2f0c57d
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/b2f0c57d
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/b2f0c57d
Branch: refs/heads/master
Commit: b2f0c57df6fd9191904213622ae718a0790d7a03
Parents: 5045f67
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 18 11:07:32 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 18 11:07:32 2017 -0700
----------------------------------------------------------------------
.../traversal/step/filter/SampleGlobalStep.java | 19 ++++++++++++--
.../traversal/step/map/OrderGlobalStep.java | 27 +++++---------------
.../step/util/CollectingBarrierStep.java | 24 ++++++++++-------
.../ComputerVerificationStrategy.java | 8 ------
.../traversal/traverser/ProjectedTraverser.java | 16 +++++++-----
.../gremlin/util/function/MultiComparator.java | 14 +++++++---
6 files changed, 60 insertions(+), 48 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2f0c57d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java
index 0a4da58..2b2cf20 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/filter/SampleGlobalStep.java
@@ -24,6 +24,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.lambda.ConstantTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.ProjectedTraverser;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
@@ -64,6 +65,15 @@ public final class SampleGlobalStep<S> extends CollectingBarrierStep<S> implemen
}
@Override
+ public void processAllStarts() {
+ if (this.starts.hasNext()) {
+ while (this.starts.hasNext()) {
+ this.traverserSet.add(this.createProjectedTraverser(this.starts.next()));
+ }
+ }
+ }
+
+ @Override
public void barrierConsumer(final TraverserSet<S> traverserSet) {
// return the entire traverser set if the set is smaller than the amount to sample
if (traverserSet.bulkSize() <= this.amountToSample)
@@ -71,7 +81,7 @@ public final class SampleGlobalStep<S> extends CollectingBarrierStep<S> implemen
//////////////// else sample the set
double totalWeight = 0.0d;
for (final Traverser.Admin<S> s : traverserSet) {
- totalWeight = totalWeight + TraversalUtil.apply(s, this.probabilityTraversal).doubleValue() * s.bulk();
+ totalWeight = totalWeight + (((ProjectedTraverser<S, Number>) s).getProjections().get(0).doubleValue() * s.bulk());
}
///////
final TraverserSet<S> sampledSet = new TraverserSet<>();
@@ -82,7 +92,7 @@ public final class SampleGlobalStep<S> extends CollectingBarrierStep<S> implemen
for (final Traverser.Admin<S> s : traverserSet) {
long sampleBulk = sampledSet.contains(s) ? sampledSet.get(s).bulk() : 0;
if (sampleBulk < s.bulk()) {
- final double currentWeight = TraversalUtil.apply(s, this.probabilityTraversal).doubleValue();
+ final double currentWeight = ((ProjectedTraverser<S, Number>) s).getProjections().get(0).doubleValue();
for (int i = 0; i < (s.bulk() - sampleBulk); i++) {
runningWeight = runningWeight + currentWeight;
if (RANDOM.nextDouble() <= ((runningWeight / totalWeight))) {
@@ -104,6 +114,11 @@ public final class SampleGlobalStep<S> extends CollectingBarrierStep<S> implemen
traverserSet.addAll(sampledSet);
}
+
+ private final ProjectedTraverser<S, Number> createProjectedTraverser(final Traverser.Admin<S> traverser) {
+ return new ProjectedTraverser<>(traverser, Collections.singletonList(TraversalUtil.apply(traverser, this.probabilityTraversal)));
+ }
+
@Override
public Set<TraverserRequirement> getRequirements() {
return this.getSelfAndChildRequirements(TraverserRequirement.BULK);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2f0c57d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
index 9c071f1..55d8650 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/OrderGlobalStep.java
@@ -73,22 +73,11 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa
this.multiComparator = this.createMultiComparator();
if (this.starts.hasNext()) {
while (this.starts.hasNext()) {
- this.traverserSet.add(this.createOrderedTraverser(this.starts.next()));
+ this.traverserSet.add(this.createProjectedTraverser(this.starts.next()));
}
- this.barrierConsumer(this.traverserSet);
}
}
- @Override
- public Traverser.Admin<S> processNextStart() {
- if (!this.traverserSet.isEmpty()) {
- return this.traverserSet.remove();
- } else if (this.starts.hasNext()) {
- this.processAllStarts();
- }
- return ((ProjectedTraverser) this.traverserSet.remove()).getInternal();
- }
-
public void setLimit(final long limit) {
this.limit = limit;
}
@@ -162,18 +151,18 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa
public MemoryComputeKey<TraverserSet<S>> getMemoryComputeKey() {
if (null == this.multiComparator)
this.multiComparator = this.createMultiComparator();
- return MemoryComputeKey.of(this.getId(), new OrderBiOperator<>(this.limit, this.isShuffle, this.multiComparator), false, true);
+ return MemoryComputeKey.of(this.getId(), new OrderBiOperator<>(this.limit, this.multiComparator), false, true);
}
- private ProjectedTraverser<S> createOrderedTraverser(final Traverser.Admin<S> traverser) {
+ private final ProjectedTraverser<S,Object> createProjectedTraverser(final Traverser.Admin<S> traverser) {
final List<Object> projections = new ArrayList<>(this.comparators.size());
for (final Pair<Traversal.Admin<S, C>, Comparator<C>> pair : this.comparators) {
projections.add(TraversalUtil.apply(traverser, pair.getValue0()));
}
- return new ProjectedTraverser<S>(traverser, projections);
+ return new ProjectedTraverser<>(traverser, projections);
}
- private MultiComparator<C> createMultiComparator() {
+ private final MultiComparator<C> createMultiComparator() {
final List<Comparator<C>> list = new ArrayList<>(this.comparators.size());
for (final Pair<Traversal.Admin<S, C>, Comparator<C>> pair : this.comparators) {
list.add(pair.getValue1());
@@ -186,16 +175,14 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa
public static final class OrderBiOperator<S> implements BinaryOperator<TraverserSet<S>>, Serializable {
private long limit;
- private boolean isShuffle;
private MultiComparator comparator;
private OrderBiOperator() {
// for serializers that need a no-arg constructor
}
- public OrderBiOperator(final long limit, final boolean isShuffle, final MultiComparator multiComparator) {
+ public OrderBiOperator(final long limit, final MultiComparator multiComparator) {
this.limit = limit;
- this.isShuffle = isShuffle;
this.comparator = multiComparator;
}
@@ -203,7 +190,7 @@ public final class OrderGlobalStep<S, C extends Comparable> extends CollectingBa
public TraverserSet<S> apply(final TraverserSet<S> setA, final TraverserSet<S> setB) {
setA.addAll(setB);
if (Long.MAX_VALUE != this.limit && setA.bulkSize() > this.limit) {
- if (this.isShuffle)
+ if (this.comparator.isShuffle())
setA.shuffle();
else
setA.sort(this.comparator);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2f0c57d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java
index b0cce80..f99201d 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/CollectingBarrierStep.java
@@ -23,11 +23,13 @@ import org.apache.tinkerpop.gremlin.process.traversal.Operator;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.ProjectedTraverser;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedFactory;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import java.util.Collections;
import java.util.NoSuchElementException;
@@ -40,7 +42,8 @@ import java.util.function.BinaryOperator;
public abstract class CollectingBarrierStep<S> extends AbstractStep<S, S> implements Barrier<TraverserSet<S>> {
protected TraverserSet<S> traverserSet = new TraverserSet<>();
- protected int maxBarrierSize;
+ private int maxBarrierSize;
+ private boolean barrierConsumed = false;
public CollectingBarrierStep(final Traversal.Admin traversal) {
this(traversal, Integer.MAX_VALUE);
@@ -68,7 +71,6 @@ public abstract class CollectingBarrierStep<S> extends AbstractStep<S, S> implem
this.traverserSet.add(this.starts.next());
}
}
- this.barrierConsumer(this.traverserSet);
}
}
@@ -85,11 +87,10 @@ public abstract class CollectingBarrierStep<S> extends AbstractStep<S, S> implem
throw FastNoSuchElementException.instance();
else {
final TraverserSet<S> temp = new TraverserSet<>();
- this.traverserSet.iterator().forEachRemaining(t -> {
+ IteratorUtils.removeOnNext(this.traverserSet.iterator()).forEachRemaining(t -> {
DetachedFactory.detach(t, true); // this should be dynamic
temp.add(t);
});
- this.traverserSet.clear();
return temp;
}
}
@@ -98,23 +99,28 @@ public abstract class CollectingBarrierStep<S> extends AbstractStep<S, S> implem
public void addBarrier(final TraverserSet<S> barrier) {
this.traverserSet = barrier;
this.traverserSet.forEach(traverser -> traverser.setSideEffects(this.getTraversal().getSideEffects()));
- this.barrierConsumer(this.traverserSet);
+ this.barrierConsumed = false;
}
@Override
public Traverser.Admin<S> processNextStart() {
- if (!this.traverserSet.isEmpty()) {
- return this.traverserSet.remove();
- } else if (this.starts.hasNext()) {
+ if (this.traverserSet.isEmpty() && this.starts.hasNext()) {
this.processAllStarts();
+ this.barrierConsumed = false;
+ }
+ //
+ if (!this.barrierConsumed) {
+ this.barrierConsumer(this.traverserSet);
+ this.barrierConsumed = true;
}
- return this.traverserSet.remove();
+ return ProjectedTraverser.tryUnwrap(this.traverserSet.remove());
}
@Override
public CollectingBarrierStep<S> clone() {
final CollectingBarrierStep<S> clone = (CollectingBarrierStep<S>) super.clone();
clone.traverserSet = new TraverserSet<>();
+ clone.barrierConsumed = false;
return clone;
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2f0c57d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java
index 5777adb..ef9b95c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/strategy/verification/ComputerVerificationStrategy.java
@@ -28,11 +28,9 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.GraphComputing;
import org.apache.tinkerpop.gremlin.process.traversal.step.Mutating;
import org.apache.tinkerpop.gremlin.process.traversal.step.PathProcessor;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
-import org.apache.tinkerpop.gremlin.process.traversal.step.filter.SampleGlobalStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.InjectStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SubgraphStep;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.ProfileStep;
import org.apache.tinkerpop.gremlin.process.traversal.strategy.AbstractTraversalStrategy;
@@ -87,12 +85,6 @@ public final class ComputerVerificationStrategy extends AbstractTraversalStrateg
throw new VerificationException("Local traversals may not traverse past the local star-graph on GraphComputer: " + traversalOptional.get(), traversal);
}
- // sample step use can only operate on the element and its properties (no incidences)
- if (step instanceof SampleGlobalStep) {
- if (((TraversalParent) step).getLocalChildren().stream().filter(t -> !TraversalHelper.isLocalProperties(t)).findAny().isPresent())
- throw new VerificationException("The following barrier step can not process the incident edges of a vertex on GraphComputer: " + step, traversal);
- }
-
// this is a problem because sideEffect.merge() is transient on the OLAP reduction
if (TraversalHelper.getRootTraversal(traversal).getTraverserRequirements().contains(TraverserRequirement.ONE_BULK))
throw new VerificationException("One bulk is currently not supported on GraphComputer: " + step, traversal);
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2f0c57d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/ProjectedTraverser.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/ProjectedTraverser.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/ProjectedTraverser.java
index 67e723a..5cecdc4 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/ProjectedTraverser.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/traverser/ProjectedTraverser.java
@@ -32,16 +32,16 @@ import java.util.function.Function;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class ProjectedTraverser<T> implements Traverser.Admin<T> {
+public final class ProjectedTraverser<T, P> implements Traverser.Admin<T> {
private Traverser.Admin<T> internal;
- private List<Object> projections;
+ private List<P> projections;
private ProjectedTraverser() {
// for serialization
}
- public ProjectedTraverser(final Traverser.Admin<T> internal, final List<Object> projections) {
+ public ProjectedTraverser(final Traverser.Admin<T> internal, final List<P> projections) {
this.internal = internal;
this.projections = projections;
}
@@ -51,7 +51,7 @@ public final class ProjectedTraverser<T> implements Traverser.Admin<T> {
return this.internal;
}
- public List<Object> getProjections() {
+ public List<P> getProjections() {
return this.projections;
}
@@ -187,13 +187,17 @@ public final class ProjectedTraverser<T> implements Traverser.Admin<T> {
}
@Override
- public ProjectedTraverser<T> clone() {
+ public ProjectedTraverser<T, P> clone() {
try {
- final ProjectedTraverser<T> clone = (ProjectedTraverser<T>) super.clone();
+ final ProjectedTraverser<T, P> clone = (ProjectedTraverser<T, P>) super.clone();
clone.internal = (Traverser.Admin<T>) this.internal.clone();
return clone;
} catch (final CloneNotSupportedException e) {
throw new IllegalStateException(e.getMessage(), e);
}
}
+
+ public static <T> Traverser.Admin<T> tryUnwrap(final Traverser.Admin<T> traverser) {
+ return traverser instanceof ProjectedTraverser ? ((ProjectedTraverser) traverser).getInternal() : traverser;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/b2f0c57d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/MultiComparator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/MultiComparator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/MultiComparator.java
index 427aa3d..b7176ab 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/MultiComparator.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/util/function/MultiComparator.java
@@ -32,9 +32,11 @@ import java.util.List;
public final class MultiComparator<C> implements Comparator<C>, Serializable {
private final List<Comparator> comparators;
+ private final boolean isShuffle;
public MultiComparator(final List<Comparator<C>> comparators) {
this.comparators = (List) comparators;
+ this.isShuffle = !this.comparators.isEmpty() && Order.shuffle == this.comparators.get(this.comparators.size() - 1);
}
@Override
@@ -43,14 +45,20 @@ public final class MultiComparator<C> implements Comparator<C>, Serializable {
return Order.incr.compare(objectA, objectB);
} else {
for (int i = 0; i < this.comparators.size(); i++) {
- final int comparison = this.comparators.get(i).compare(this.getObject(objectA, i), this.getObject(objectB, i));
- if (comparison != 0)
- return comparison;
+ if (Order.shuffle != this.comparators.get(i)) {
+ final int comparison = this.comparators.get(i).compare(this.getObject(objectA, i), this.getObject(objectB, i));
+ if (comparison != 0)
+ return comparison;
+ }
}
return 0;
}
}
+ public boolean isShuffle() {
+ return this.isShuffle;
+ }
+
private final Object getObject(final C object, final int index) {
if (object instanceof ProjectedTraverser)
return ((ProjectedTraverser) object).getProjections().get(index);