You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2017/01/18 21:17:40 UTC

tinkerpop git commit: Got a super simple implementation of GroupStep working. Tada. I think I can get it even simpler too....

Repository: tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1606 [created] f53327b7f


Got a super simple implementation of GroupStep working. Tada. I think I can get it even simpler too....


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/f53327b7
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/f53327b7
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/f53327b7

Branch: refs/heads/TINKERPOP-1606
Commit: f53327b7fe7ba927a04400739d30b9f28ec85ac2
Parents: d1f0889
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Jan 18 14:17:35 2017 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Jan 18 14:17:35 2017 -0700

----------------------------------------------------------------------
 .../process/traversal/step/map/GroupStep.java   | 191 ++++---------------
 .../step/sideEffect/GroupSideEffectStep.java    |  20 +-
 2 files changed, 49 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f53327b7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
index d6ce421..4d229d1 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
@@ -31,7 +31,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
 import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
 import org.apache.tinkerpop.gremlin.process.traversal.step.LambdaHolder;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
-import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
@@ -40,7 +40,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
-import org.javatuples.Pair;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -90,15 +89,19 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
     public Map<K, V> projectTraverser(final Traverser.Admin<S> traverser) {
         final Map<K, V> map = new HashMap<>(1);
         if (null == this.preTraversal) {
-            map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverser);
+            map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) new TraverserSet<>(traverser));
         } else {
-            final TraverserSet traverserSet = new TraverserSet<>();
             this.preTraversal.reset();
             this.preTraversal.addStart(traverser);
-            while (this.preTraversal.hasNext()) {
-                traverserSet.add(this.preTraversal.nextTraverser());
-            }
-            map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverserSet);
+            final Barrier barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, this.preTraversal).orElse(null);
+            if (null == barrierStep) {
+                final TraverserSet set = new TraverserSet();
+                while (this.preTraversal.hasNext()) {
+                    set.add(this.preTraversal.nextTraverser());
+                }
+                map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) set);
+            } else if (barrierStep.hasNextBarrier())
+                map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) barrierStep.nextBarrier());
         }
         return map;
     }
@@ -160,20 +163,18 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
 
     public static final class GroupBiOperator<K, V> implements BinaryOperator<Map<K, V>>, Serializable, Cloneable {
 
-        // size limit before Barrier.processAllStarts() to lazy reduce
-        private static final int SIZE_LIMIT = 1000;
-
-        private Traversal.Admin<?, V> valueTraversal;
         private Barrier barrierStep;
 
         public GroupBiOperator(final Traversal.Admin<?, V> valueTraversal) {
             // if there is a lambda that can not be serialized, then simply use TraverserSets
-            if (TraversalHelper.hasStepOfAssignableClassRecursively(LambdaHolder.class, valueTraversal)) {
-                this.valueTraversal = null;
+            if (TraversalHelper.hasStepOfAssignableClassRecursively(LambdaHolder.class, valueTraversal))
                 this.barrierStep = null;
-            } else {
-                this.valueTraversal = valueTraversal.clone();
-                this.barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, this.valueTraversal).orElse(null);
+            else {
+                this.barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, valueTraversal.clone()).orElse(null);
+                if (this.barrierStep instanceof CollectingBarrierStep)
+                    this.barrierStep = null;
+                if (null != this.barrierStep)
+                    this.barrierStep = (Barrier) ((Step) this.barrierStep).clone();
             }
         }
 
@@ -185,10 +186,8 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
         public GroupBiOperator<K, V> clone() {
             try {
                 final GroupBiOperator<K, V> clone = (GroupBiOperator<K, V>) super.clone();
-                if (null != this.valueTraversal) {
-                    clone.valueTraversal = this.valueTraversal.clone();
-                    clone.barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, clone.valueTraversal).orElse(null);
-                }
+                if (null != this.barrierStep)
+                    clone.barrierStep = (Barrier) ((Step) this.barrierStep).clone();
                 return clone;
             } catch (final CloneNotSupportedException e) {
                 throw new IllegalStateException(e.getMessage(), e);
@@ -200,120 +199,17 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
             for (final K key : mapB.keySet()) {
                 Object objectA = mapA.get(key);
                 final Object objectB = mapB.get(key);
-                assert null != objectB;
                 if (null == objectA) {
                     objectA = objectB;
+                } else if (null == objectB) {
+
                 } else {
-                    // TRAVERSER
-                    if (objectA instanceof Traverser.Admin) {
-                        if (objectB instanceof Traverser.Admin) {
-                            final TraverserSet set = new TraverserSet();
-                            set.add((Traverser.Admin) objectA);
-                            set.add((Traverser.Admin) objectB);
-                            objectA = set;
-                        } else if (objectB instanceof TraverserSet) {
-                            final TraverserSet set = (TraverserSet) objectB;
-                            set.add((Traverser.Admin) objectA);
-                            if (null != this.barrierStep && set.size() > SIZE_LIMIT) {
-                                this.valueTraversal.reset();
-                                ((Step) this.barrierStep).addStarts(set.iterator());
-                                objectA = this.barrierStep.nextBarrier();
-                            } else
-                                objectA = objectB;
-                        } else if (objectB instanceof Pair) {
-                            final TraverserSet set = (TraverserSet) ((Pair) objectB).getValue0();
-                            set.add((Traverser.Admin) objectA);
-                            if (set.size() > SIZE_LIMIT) {    // barrier step can never be null -- no need to check
-                                this.valueTraversal.reset();
-                                ((Step) this.barrierStep).addStarts(set.iterator());
-                                this.barrierStep.addBarrier(((Pair) objectB).getValue1());
-                                objectA = this.barrierStep.nextBarrier();
-                            } else
-                                objectA = Pair.with(set, ((Pair) objectB).getValue1());
-                        } else
-                            objectA = Pair.with(new TraverserSet((Traverser.Admin) objectA), objectB);
-                        // TRAVERSER SET
-                    } else if (objectA instanceof TraverserSet) {
-                        if (objectB instanceof Traverser.Admin) {
-                            final TraverserSet set = (TraverserSet) objectA;
-                            set.add((Traverser.Admin) objectB);
-                            if (null != this.barrierStep && set.size() > SIZE_LIMIT) {
-                                this.valueTraversal.reset();
-                                ((Step) this.barrierStep).addStarts(set.iterator());
-                                objectA = this.barrierStep.nextBarrier();
-                            }
-                        } else if (objectB instanceof TraverserSet) {
-                            final TraverserSet set = (TraverserSet) objectA;
-                            set.addAll((TraverserSet) objectB);
-                            if (null != this.barrierStep && set.size() > SIZE_LIMIT) {
-                                this.valueTraversal.reset();
-                                ((Step) this.barrierStep).addStarts(set.iterator());
-                                objectA = this.barrierStep.nextBarrier();
-                            }
-                        } else if (objectB instanceof Pair) {
-                            final TraverserSet set = (TraverserSet) objectA;
-                            set.addAll((TraverserSet) ((Pair) objectB).getValue0());
-                            if (set.size() > SIZE_LIMIT) {  // barrier step can never be null -- no need to check
-                                this.valueTraversal.reset();
-                                ((Step) this.barrierStep).addStarts(set.iterator());
-                                this.barrierStep.addBarrier(((Pair) objectB).getValue1());
-                                objectA = this.barrierStep.nextBarrier();
-                            } else
-                                objectA = Pair.with(set, ((Pair) objectB).getValue1());
-                        } else
-                            objectA = Pair.with(objectA, objectB);
-                        // TRAVERSER SET + BARRIER
-                    } else if (objectA instanceof Pair) {
-                        if (objectB instanceof Traverser.Admin) {
-                            final TraverserSet set = ((TraverserSet) ((Pair) objectA).getValue0());
-                            set.add((Traverser.Admin) objectB);
-                            if (set.size() > SIZE_LIMIT) { // barrier step can never be null -- no need to check
-                                this.valueTraversal.reset();
-                                ((Step) this.barrierStep).addStarts(set.iterator());
-                                this.barrierStep.addBarrier(((Pair) objectA).getValue1());
-                                objectA = this.barrierStep.nextBarrier();
-                            }
-                        } else if (objectB instanceof TraverserSet) {
-                            final TraverserSet set = (TraverserSet) ((Pair) objectA).getValue0();
-                            set.addAll((TraverserSet) objectB);
-                            if (set.size() > SIZE_LIMIT) {   // barrier step can never be null -- no need to check
-                                this.valueTraversal.reset();
-                                ((Step) this.barrierStep).addStarts(set.iterator());
-                                this.barrierStep.addBarrier(((Pair) objectA).getValue1());
-                                objectA = this.barrierStep.nextBarrier();
-                            }
-                        } else if (objectB instanceof Pair) {
-                            this.valueTraversal.reset();
-                            this.barrierStep.addBarrier(((Pair) objectA).getValue1());
-                            this.barrierStep.addBarrier(((Pair) objectB).getValue1());
-                            ((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectA).getValue0()).iterator());
-                            ((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectB).getValue0()).iterator());
-                            objectA = this.barrierStep.nextBarrier();
-                        } else {
-                            this.valueTraversal.reset();
-                            this.barrierStep.addBarrier(((Pair) objectA).getValue1());
-                            this.barrierStep.addBarrier(objectB);
-                            ((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectA).getValue0()).iterator());
-                            objectA = this.barrierStep.nextBarrier();
-                        }
-                        // BARRIER
+                    if (null == this.barrierStep) {
+                        ((TraverserSet) objectA).addAll((TraverserSet) objectB);
                     } else {
-                        if (objectB instanceof Traverser.Admin) {
-                            objectA = Pair.with(new TraverserSet<>((Traverser.Admin) objectB), objectA);
-                        } else if (objectB instanceof TraverserSet) {
-                            objectA = Pair.with(objectB, objectA);
-                        } else if (objectB instanceof Pair) {
-                            this.valueTraversal.reset();
-                            this.barrierStep.addBarrier(objectA);
-                            this.barrierStep.addBarrier(((Pair) objectB).getValue1());
-                            ((Step) this.barrierStep).addStarts(((TraverserSet) ((Pair) objectB).getValue0()).iterator());
-                            objectA = this.barrierStep.nextBarrier();
-                        } else {
-                            this.valueTraversal.reset();
-                            this.barrierStep.addBarrier(objectA);
-                            this.barrierStep.addBarrier(objectB);
-                            objectA = this.barrierStep.nextBarrier();
-                        }
+                        this.barrierStep.addBarrier(objectA);
+                        this.barrierStep.addBarrier(objectB);
+                        objectA = this.barrierStep.nextBarrier();
                     }
                 }
                 mapA.put(key, (V) objectA);
@@ -321,16 +217,12 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
             return mapA;
         }
 
-        // necessary to control Java Serialization to ensure proper clearing of internal traverser data
         private void writeObject(final ObjectOutputStream outputStream) throws IOException {
-            // necessary as a non-root child is being sent over the wire
-            if (null != this.valueTraversal) this.valueTraversal.setParent(EmptyStep.instance());
-            outputStream.writeObject(null == this.valueTraversal ? null : this.valueTraversal.clone()); // todo: reset() instead?
+            outputStream.writeObject(this.barrierStep); // todo: reset() instead?
         }
 
         private void readObject(final ObjectInputStream inputStream) throws IOException, ClassNotFoundException {
-            this.valueTraversal = (Traversal.Admin<?, V>) inputStream.readObject();
-            this.barrierStep = null == this.valueTraversal ? null : TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, this.valueTraversal).orElse(null);
+            this.barrierStep = (Barrier) inputStream.readObject();
         }
     }
 
@@ -354,35 +246,24 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
         final Traversal.Admin<?, ?> first = __.identity().asAdmin();
         boolean updated = false;
         for (final Step step : valueTraversal.getSteps()) {
-            if (step instanceof Barrier)
-                break;
             first.addStep(step.clone());
             updated = true;
+            if (step instanceof Barrier)
+                break;
+
         }
         return updated ? first : null;
     }
 
     public static <K, V> Map<K, V> doFinalReduction(final Map<K, Object> map, final Traversal.Admin<?, V> valueTraversal) {
         final Map<K, V> reducedMap = new HashMap<>(map.size());
-        final Barrier reducingBarrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, valueTraversal).orElse(null);
+        final Barrier barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, valueTraversal).orElse(null);
         IteratorUtils.removeOnNext(map.entrySet().iterator()).forEachRemaining(entry -> {
-            if (null == reducingBarrierStep) {
-                if (entry.getValue() instanceof TraverserSet) {
-                    if (!((TraverserSet) entry.getValue()).isEmpty())
-                        reducedMap.put(entry.getKey(), ((TraverserSet<V>) entry.getValue()).peek().get());
-                } else
-                    reducedMap.put(entry.getKey(), (V) entry.getValue());
+            if (null == barrierStep) {
+                reducedMap.put(entry.getKey(), ((TraverserSet<V>) entry.getValue()).peek().get());
             } else {
                 valueTraversal.reset();
-                if (entry.getValue() instanceof Traverser.Admin)
-                    ((Step) reducingBarrierStep).addStart((Traverser.Admin) entry.getValue());
-                else if (entry.getValue() instanceof TraverserSet)
-                    ((Step) reducingBarrierStep).addStarts(((TraverserSet) entry.getValue()).iterator());
-                else if (entry.getValue() instanceof Pair) {
-                    ((Step) reducingBarrierStep).addStarts(((TraverserSet) (((Pair) entry.getValue()).getValue0())).iterator());
-                    reducingBarrierStep.addBarrier((((Pair) entry.getValue()).getValue1()));
-                } else
-                    reducingBarrierStep.addBarrier(entry.getValue());
+                barrierStep.addBarrier(entry.getValue());
                 if (valueTraversal.hasNext())
                     reducedMap.put(entry.getKey(), valueTraversal.next());
             }

http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/f53327b7/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
index 0e8a4f5..bba795b 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
@@ -21,12 +21,14 @@ package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__;
+import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
 import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
 import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GroupStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
@@ -76,15 +78,19 @@ public final class GroupSideEffectStep<S, K, V> extends SideEffectStep<S> implem
     protected void sideEffect(final Traverser.Admin<S> traverser) {
         final Map<K, V> map = new HashMap<>(1);
         if (null == this.preTraversal) {
-            map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverser.split());
+            map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) new TraverserSet<>(traverser));
         } else {
-            final TraverserSet traverserSet = new TraverserSet<>();
             this.preTraversal.reset();
-            this.preTraversal.addStart(traverser.split());
-            while(this.preTraversal.hasNext()) {
-                traverserSet.add(this.preTraversal.nextTraverser());
-            }
-            map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverserSet);
+            this.preTraversal.addStart(traverser);
+            final Barrier barrierStep = TraversalHelper.getFirstStepOfAssignableClass(Barrier.class, this.preTraversal).orElse(null);
+            if (null == barrierStep) {
+                final TraverserSet traverserSet = new TraverserSet();
+                while (this.preTraversal.hasNext()) {
+                    traverserSet.add(this.preTraversal.nextTraverser());
+                }
+                map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) traverserSet);
+            } else if (barrierStep.hasNextBarrier())
+                map.put(TraversalUtil.applyNullable(traverser, this.keyTraversal), (V) barrierStep.nextBarrier());
         }
         this.getTraversal().getSideEffects().add(this.sideEffectKey, map);
     }