You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/18 21:17:40 UTC
tinkerpop git commit: Got a super simple implementation of GroupStep
working. Tada. I think I can get it even simpler too....
Repository: tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1606 [created] f53327b7f
Got a super simple implementation of GroupStep working. Tada. I think I can get it even simpler too....
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f53327b7
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f53327b7
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f53327b7
Branch: refs/heads/TINKERPOP-1606
Commit: f53327b7fe7ba927a04400739d30b9f28ec85ac2
Parents: d1f0889
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 18 14:17:35 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 18 14:17:35 2017 -0700
----------------------------------------------------------------------
.../process/traversal/step/map/GroupStep.java | 191 ++++---------------
.../step/sideEffect/GroupSideEffectStep.java | 20 +-
2 files changed, 49 insertions(+), 162 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f53327b7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
index d6ce421..4d229d1 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
@@ -31,7 +31,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
import org.apache.tinkerpop.gremlin.process.traversal.step.LambdaHolder;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
@@ -40,7 +40,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import org.javatuples.Pair;
import java.io.IOException;
import java.io.ObjectInputStream;
@@ -90,15 +89,19 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
public Map<K, V> projectTraverser(final Traverser.Admin<S> traverser) {
final Map<K, V> map = new HashMap<>(1);
if (null == this.preTraversal) {
- map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverser);
+ map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) new TraverserSet<>(traverser));
} else {
- final TraverserSet traverserSet = new TraverserSet<>();
this.preTraversal.reset();
this.preTraversal.addStart(traverser);
- while (this.preTraversal.hasNext()) {
- traverserSet.add(this.preTraversal.nextTraverser());
- }
- map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverserSet);
+ final Barrier barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, this.preTraversal).orElse(null);
+ if (null == barrierStep) {
+ final TraverserSet set = new TraverserSet();
+ while (this.preTraversal.hasNext()) {
+ set.add(this.preTraversal.nextTraverser());
+ }
+ map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) set);
+ } else if (barrierStep.hasNextBarrier())
+ map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) barrierStep.nextBarrier());
}
return map;
}
@@ -160,20 +163,18 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
public static final class GroupBiOperator<K, V> implements BinaryOperator<Map<K, V>>, Serializable, Cloneable {
- // size limit before Barrier.processAllStarts() to lazy reduce
- private static final int SIZE_LIMIT = 1000;
-
- private Traversal.Admin<?, V> valueTraversal;
private Barrier barrierStep;
public GroupBiOperator(final Traversal.Admin<?, V> valueTraversal) {
// if there is a lambda that can not be serialized, then simply use TraverserSets
- if (TraversalHelper.hasStepOfAssignableClassRecursively(LambdaHolder.class, valueTraversal)) {
- this.valueTraversal = null;
+ if (TraversalHelper.hasStepOfAssignableClassRecursively(LambdaHolder.class, valueTraversal))
this.barrierStep = null;
- } else {
- this.valueTraversal = valueTraversal.clone();
- this.barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, this.valueTraversal).orElse(null);
+ else {
+ this.barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, valueTraversal.clone()).orElse(null);
+ if (this.barrierStep instanceof CollectingBarrierStep)
+ this.barrierStep = null;
+ if (null != this.barrierStep)
+ this.barrierStep = (Barrier) ((Step) this.barrierStep).clone();
}
}
@@ -185,10 +186,8 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
public GroupBiOperator<K, V> clone() {
try {
final GroupBiOperator<K, V> clone = (GroupBiOperator<K, V>) super.clone();
- if (null != this.valueTraversal) {
- clone.valueTraversal = this.valueTraversal.clone();
- clone.barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, clone.valueTraversal).orElse(null);
- }
+ if (null != this.barrierStep)
+ clone.barrierStep = (Barrier) ((Step) this.barrierStep).clone();
return clone;
} catch (final CloneNotSupportedException e) {
throw new IllegalStateException(e.getMessage(), e);
@@ -200,120 +199,17 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
for (final K key : mapB.keySet()) {
Object objectA = mapA.get(key);
final Object objectB = mapB.get(key);
- assert null != objectB;
if (null == objectA) {
objectA = objectB;
+ } else if (null == objectB) {
+
} else {
- // TRAVERSER
- if (objectA instanceof Traverser.Admin) {
- if (objectB instanceof Traverser.Admin) {
- final TraverserSet set = new TraverserSet();
- set.add((Traverser.Admin) objectA);
- set.add((Traverser.Admin) objectB);
- objectA = set;
- } else if (objectB instanceof TraverserSet) {
- final TraverserSet set = (TraverserSet) objectB;
- set.add((Traverser.Admin) objectA);
- if (null != this.barrierStep && set.size() > SIZE_LIMIT) {
- this.valueTraversal.reset();
- ((Step) this.barrierStep).addStarts(set.iterator());
- objectA = this.barrierStep.nextBarrier();
- } else
- objectA = objectB;
- } else if (objectB instanceof Pair) {
- final TraverserSet set = (TraverserSet) ((Pair) objectB).getValue0();
- set.add((Traverser.Admin) objectA);
- if (set.size() > SIZE_LIMIT) { // barrier step can never be null -- no need to check
- this.valueTraversal.reset();
- ((Step) this.barrierStep).addStarts(set.iterator());
- this.barrierStep.addBarrier(((Pair) objectB).getValue1());
- objectA = this.barrierStep.nextBarrier();
- } else
- objectA = Pair.with(set, ((Pair) objectB).getValue1());
- } else
- objectA = Pair.with(new TraverserSet((Traverser.Admin) objectA), objectB);
- // TRAVERSER SET
- } else if (objectA instanceof TraverserSet) {
- if (objectB instanceof Traverser.Admin) {
- final TraverserSet set = (TraverserSet) objectA;
- set.add((Traverser.Admin) objectB);
- if (null != this.barrierStep && set.size() > SIZE_LIMIT) {
- this.valueTraversal.reset();
- ((Step) this.barrierStep).addStarts(set.iterator());
- objectA = this.barrierStep.nextBarrier();
- }
- } else if (objectB instanceof TraverserSet) {
- final TraverserSet set = (TraverserSet) objectA;
- set.addAll((TraverserSet) objectB);
- if (null != this.barrierStep && set.size() > SIZE_LIMIT) {
- this.valueTraversal.reset();
- ((Step) this.barrierStep).addStarts(set.iterator());
- objectA = this.barrierStep.nextBarrier();
- }
- } else if (objectB instanceof Pair) {
- final TraverserSet set = (TraverserSet) objectA;
- set.addAll((TraverserSet) ((Pair) objectB).getValue0());
- if (set.size() > SIZE_LIMIT) { // barrier step can never be null -- no need to check
- this.valueTraversal.reset();
- ((Step) this.barrierStep).addStarts(set.iterator());
- this.barrierStep.addBarrier(((Pair) objectB).getValue1());
- objectA = this.barrierStep.nextBarrier();
- } else
- objectA = Pair.with(set, ((Pair) objectB).getValue1());
- } else
- objectA = Pair.with(objectA, objectB);
- // TRAVERSER SET + BARRIER
- } else if (objectA instanceof Pair) {
- if (objectB instanceof Traverser.Admin) {
- final TraverserSet set = ((TraverserSet) ((Pair) objectA).getValue0());
- set.add((Traverser.Admin) objectB);
- if (set.size() > SIZE_LIMIT) { // barrier step can never be null -- no need to check
- this.valueTraversal.reset();
- ((Step) this.barrierStep).addStarts(set.iterator());
- this.barrierStep.addBarrier(((Pair) objectA).getValue1());
- objectA = this.barrierStep.nextBarrier();
- }
- } else if (objectB instanceof TraverserSet) {
- final TraverserSet set = (TraverserSet) ((Pair) objectA).getValue0();
- set.addAll((TraverserSet) objectB);
- if (set.size() > SIZE_LIMIT) { // barrier step can never be null -- no need to check
- this.valueTraversal.reset();
- ((Step) this.barrierStep).addStarts(set.iterator());
- this.barrierStep.addBarrier(((Pair) objectA).getValue1());
- objectA = this.barrierStep.nextBarrier();
- }
- } else if (objectB instanceof Pair) {
- this.valueTraversal.reset();
- this.barrierStep.addBarrier(((Pair) objectA).getValue1());
- this.barrierStep.addBarrier(((Pair) objectB).getValue1());
- ((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectA).getValue0()).iterator());
- ((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectB).getValue0()).iterator());
- objectA = this.barrierStep.nextBarrier();
- } else {
- this.valueTraversal.reset();
- this.barrierStep.addBarrier(((Pair) objectA).getValue1());
- this.barrierStep.addBarrier(objectB);
- ((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectA).getValue0()).iterator());
- objectA = this.barrierStep.nextBarrier();
- }
- // BARRIER
+ if (null == this.barrierStep) {
+ ((TraverserSet) objectA).addAll((TraverserSet) objectB);
} else {
- if (objectB instanceof Traverser.Admin) {
- objectA = Pair.with(new TraverserSet<>((Traverser.Admin) objectB), objectA);
- } else if (objectB instanceof TraverserSet) {
- objectA = Pair.with(objectB, objectA);
- } else if (objectB instanceof Pair) {
- this.valueTraversal.reset();
- this.barrierStep.addBarrier(objectA);
- this.barrierStep.addBarrier(((Pair) objectB).getValue1());
- ((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectB).getValue0()).iterator());
- objectA = this.barrierStep.nextBarrier();
- } else {
- this.valueTraversal.reset();
- this.barrierStep.addBarrier(objectA);
- this.barrierStep.addBarrier(objectB);
- objectA = this.barrierStep.nextBarrier();
- }
+ this.barrierStep.addBarrier(objectA);
+ this.barrierStep.addBarrier(objectB);
+ objectA = this.barrierStep.nextBarrier();
}
}
mapA.put(key, (V) objectA);
@@ -321,16 +217,12 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
return mapA;
}
- // necessary to control Java Serialization to ensure proper clearing of internal traverser data
private void writeObject(final ObjectOutputStream outputStream) throws IOException {
- // necessary as a non-root child is being sent over the wire
- if (null != this.valueTraversal) this.valueTraversal.setParent(EmptyStep.instance());
- outputStream.writeObject(null == this.valueTraversal ? null : this.valueTraversal.clone()); // todo: reset() instead?
+ outputStream.writeObject(this.barrierStep); // todo: reset() instead?
}
private void readObject(final ObjectInputStream inputStream) throws IOException, ClassNotFoundException {
- this.valueTraversal = (Traversal.Admin<?, V>) inputStream.readObject();
- this.barrierStep = null == this.valueTraversal ? null : TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, this.valueTraversal).orElse(null);
+ this.barrierStep = (Barrier) inputStream.readObject();
}
}
@@ -354,35 +246,24 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
final Traversal.Admin<?, ?> first = __.identity().asAdmin();
boolean updated = false;
for (final Step step : valueTraversal.getSteps()) {
- if (step instanceof Barrier)
- break;
first.addStep(step.clone());
updated = true;
+ if (step instanceof Barrier)
+ break;
+
}
return updated ? first : null;
}
public static <K, V> Map<K, V> doFinalReduction(final Map<K, Object> map, final Traversal.Admin<?, V> valueTraversal) {
final Map<K, V> reducedMap = new HashMap<>(map.size());
- final Barrier reducingBarrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, valueTraversal).orElse(null);
+ final Barrier barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, valueTraversal).orElse(null);
IteratorUtils.removeOnNext(map.entrySet().iterator()).forEachRemaining(entry -> {
- if (null == reducingBarrierStep) {
- if (entry.getValue() instanceof TraverserSet) {
- if (!((TraverserSet) entry.getValue()).isEmpty())
- reducedMap.put(entry.getKey(), ((TraverserSet<V>) entry.getValue()).peek().get());
- } else
- reducedMap.put(entry.getKey(), (V) entry.getValue());
+ if (null == barrierStep) {
+ reducedMap.put(entry.getKey(), ((TraverserSet<V>) entry.getValue()).peek().get());
} else {
valueTraversal.reset();
- if (entry.getValue() instanceof Traverser.Admin)
- ((Step) reducingBarrierStep).addStart((Traverser.Admin) entry.getValue());
- else if (entry.getValue() instanceof TraverserSet)
- ((Step) reducingBarrierStep).addStarts(((TraverserSet) entry.getValue()).iterator());
- else if (entry.getValue() instanceof Pair) {
- ((Step) reducingBarrierStep).addStarts(((TraverserSet) (((Pair) entry.getValue()).getValue0())).iterator());
- reducingBarrierStep.addBarrier((((Pair) entry.getValue()).getValue1()));
- } else
- reducingBarrierStep.addBarrier(entry.getValue());
+ barrierStep.addBarrier(entry.getValue());
if (valueTraversal.hasNext())
reducedMap.put(entry.getKey(), valueTraversal.next());
}
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f53327b7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
index 0e8a4f5..bba795b 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
@@ -21,12 +21,14 @@ package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
@@ -76,15 +78,19 @@ public final class GroupSideEffectStep<S, K, V> extends SideEffectStep<S> implem
protected void sideEffect(final Traverser.Admin<S> traverser) {
final Map<K, V> map = new HashMap<>(1);
if (null == this.preTraversal) {
- map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverser.split());
+ map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) new TraverserSet<>(traverser));
} else {
- final TraverserSet traverserSet = new TraverserSet<>();
this.preTraversal.reset();
- this.preTraversal.addStart(traverser.split());
- while(this.preTraversal.hasNext()) {
- traverserSet.add(this.preTraversal.nextTraverser());
- }
- map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverserSet);
+ this.preTraversal.addStart(traverser);
+ final Barrier barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, this.preTraversal).orElse(null);
+ if (null == barrierStep) {
+ final TraverserSet traverserSet = new TraverserSet();
+ while (this.preTraversal.hasNext()) {
+ traverserSet.add(this.preTraversal.nextTraverser());
+ }
+ map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverserSet);
+ } else if (barrierStep.hasNextBarrier())
+ map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) barrierStep.nextBarrier());
}
this.getTraversal().getSideEffects().add(this.sideEffectKey, map);
}