You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2015/02/12 14:01:43 UTC
[17/77] [partial] incubator-tinkerpop git commit: moved com/tinkerpop
directories to org/apache/tinkerpop
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/MatchStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/MatchStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/MatchStep.java
new file mode 100644
index 0000000..7928345
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/MatchStep.java
@@ -0,0 +1,559 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.map.match;
+
+import com.tinkerpop.gremlin.process.FastNoSuchElementException;
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.traversal.step.AbstractStep;
+import com.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traverser.B_O_PA_S_SE_SL_Traverser;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import com.tinkerpop.gremlin.util.iterator.IteratorUtils;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.NoSuchElementException;
+import java.util.Set;
+import java.util.Stack;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * @author Joshua Shinavier (http://fortytwo.net)
+ */
+public final class MatchStep<S, E> extends AbstractStep<S, Map<String, E>> implements TraversalParent {
+
+ static final BiConsumer<String, Object> TRIVIAL_CONSUMER = (s, t) -> {
+ };
+
+ private static final String ANON_LABEL_PREFIX = "_";
+
+ // optimize before processing each start object, by default
+ private static final int DEFAULT_STARTS_PER_OPTIMIZE = 1;
+
+ private final String startLabel;
+ private final Map<String, List<TraversalWrapper<S, S>>> traversalsByStartAs;
+ private final List<Traversal> traversals = new ArrayList<>();
+
+ private int startsPerOptimize = DEFAULT_STARTS_PER_OPTIMIZE;
+ private int optimizeCounter = -1;
+ private int anonLabelCounter = 0;
+
+ private Enumerator<S> currentSolution;
+ private int currentIndex;
+
+ // initial value allows MatchStep to be used as a stand-alone query engine
+ private Traverser.Admin<S> currentStart;
+
+ public MatchStep(final Traversal.Admin traversal, final String startLabel, final Traversal... traversals) {
+ super(traversal);
+ this.startLabel = startLabel;
+ this.traversalsByStartAs = new HashMap<>();
+ this.currentStart = new B_O_PA_S_SE_SL_Traverser<>(null, this);
+ for (final Traversal tl : traversals) {
+ addTraversalPrivate(tl);
+ this.integrateChild(tl.asAdmin(), TYPICAL_LOCAL_OPERATIONS);
+ this.traversals.add(tl);
+ }
+ checkSolvability();
+ }
+
+ @Override
+ public Set<TraverserRequirement> getRequirements() {
+ return this.getSelfAndChildRequirements();
+ }
+
+ @Override
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.traversalsByStartAs);
+ }
+
+ /**
+ * Adds an individual traversal to an already-constructed MatchStep.
+ * The query must be solvable after addition (i.e. should not require the addition of
+ * further traversals in order to be solvable)
+ * This method should be called before the query is first executed.
+ *
+ * @param traversal the traversal to add
+ */
+ public void addTraversal(final Traversal<S, S> traversal) {
+ addTraversalPrivate(traversal);
+ this.traversals.add(traversal);
+ this.integrateChild(traversal.asAdmin(), TYPICAL_LOCAL_OPERATIONS);
+ checkSolvability();
+ }
+
+ public void setStartsPerOptimize(final int startsPerOptimize) {
+ if (startsPerOptimize < 1) {
+ throw new IllegalArgumentException();
+ }
+ this.startsPerOptimize = startsPerOptimize;
+ }
+
+ @Override
+ protected Traverser<Map<String, E>> processNextStart() throws NoSuchElementException {
+ final Map<String, E> map = new HashMap<>();
+ final Traverser<Map<String, E>> result = this.currentStart.split(map, this);
+ final BiConsumer<String, S> resultSetter = (name, value) -> map.put(name, (E) value);
+
+ while (true) { // break out when the current solution is exhausted and there are no more starts
+ if (null == this.currentSolution) {
+ if (this.starts.hasNext()) {
+ this.optimizeCounter = (this.optimizeCounter + 1) % this.startsPerOptimize;
+ if (0 == this.optimizeCounter) {
+ optimize();
+ }
+
+ this.currentStart = this.starts.next();
+ this.currentSolution = solveFor(IteratorUtils.of(this.currentStart.get()));
+ this.currentIndex = 0;
+ } else {
+ throw FastNoSuchElementException.instance();
+ }
+ }
+
+ map.clear();
+ if (this.currentSolution.visitSolution(this.currentIndex++, resultSetter)) {
+ return result;
+ } else {
+ this.currentSolution = null;
+ }
+ }
+ }
+
+ /**
+ * @return a description of the current state of this step, including the query plan and gathered statistics
+ */
+ public String summarize() {
+ final StringBuilder sb = new StringBuilder("match \"")
+ .append(this.startLabel)
+ .append("\":\t")
+ .append(findCost(this.startLabel))
+ .append('\n');
+ summarize(this.startLabel, sb, new HashSet<>(), 1);
+ return sb.toString();
+ }
+
+ private void summarize(final String outLabel,
+ final StringBuilder sb,
+ final Set<String> visited,
+ final int indent) {
+ if (!visited.contains(outLabel)) {
+ visited.add(outLabel);
+ final List<TraversalWrapper<S, S>> outs = traversalsByStartAs.get(outLabel);
+ if (null != outs) {
+ for (final TraversalWrapper<S, S> w : outs) {
+ for (int i = 0; i < indent; i++) sb.append('\t');
+ sb.append(outLabel).append("->").append(w.endLabel).append(":\t");
+ sb.append(findCost(w));
+ sb.append('\t').append(w);
+ sb.append('\n');
+ summarize(w.endLabel, sb, visited, indent + 1);
+ }
+ }
+ }
+ }
+
+ private void addTraversalPrivate(final Traversal<S, S> traversal) {
+
+ String startAs = traversal.asAdmin().getStartStep().getLabel().orElseThrow(()
+ -> new IllegalArgumentException("All match traversals must have their start step labeled with as()"));
+ String endAs = traversal.asAdmin().getEndStep().getLabel().orElse(null);
+ checkAs(startAs);
+ if (null == endAs) {
+ endAs = createAnonymousAs();
+ } else {
+ checkAs(endAs);
+ }
+
+ final TraversalWrapper<S, S> wrapper = new TraversalWrapper<>(traversal, startAs, endAs);
+ // index all wrapped traversals by their startLabel
+ List<TraversalWrapper<S, S>> l2 = this.traversalsByStartAs.get(startAs);
+ if (null == l2) {
+ l2 = new LinkedList<>();
+ this.traversalsByStartAs.put(startAs, l2);
+ }
+ l2.add(wrapper);
+ }
+
+ // given all the wrapped traversals, determine bad patterns in the set and throw exceptions if not solvable
+ private void checkSolvability() {
+ final Set<String> pathSet = new HashSet<>();
+ final Stack<String> stack = new Stack<>();
+ stack.push(this.startLabel);
+ int countTraversals = 0;
+ while (!stack.isEmpty()) {
+ final String outAs = stack.peek();
+ if (pathSet.contains(outAs)) {
+ stack.pop();
+ pathSet.remove(outAs);
+ } else {
+ pathSet.add(outAs);
+ final List<TraversalWrapper<S, S>> l = traversalsByStartAs.get(outAs);
+ if (null != l) {
+ for (final TraversalWrapper<S, S> tw : l) {
+ countTraversals++;
+ if (pathSet.contains(tw.endLabel)) {
+ throw new IllegalArgumentException("The provided traversal set contains a cycle due to '"
+ + tw.endLabel + '\'');
+ }
+ stack.push(tw.endLabel);
+ }
+ }
+ }
+ }
+
+ int totalTraversals = 0;
+ for (List<TraversalWrapper<S, S>> l : this.traversalsByStartAs.values()) {
+ totalTraversals += l.size();
+ }
+
+ if (countTraversals < totalTraversals) {
+ throw new IllegalArgumentException("The provided traversal set contains unreachable as-label(s)");
+ }
+ }
+
+ private static void checkAs(final String as) {
+ // note: this won't happen so long as the anon prefix is the same as Traversal.UNDERSCORE
+ if (isAnonymousAs(as)) {
+ throw new IllegalArgumentException("The step named '" + as + "' uses reserved prefix '"
+ + ANON_LABEL_PREFIX + '\'');
+ }
+ }
+
+ private static boolean isAnonymousAs(final String as) {
+ return as.startsWith(ANON_LABEL_PREFIX);
+ }
+
+ private String createAnonymousAs() {
+ return ANON_LABEL_PREFIX + ++this.anonLabelCounter;
+ }
+
+ /**
+ * Directly applies this match query to a sequence of inputs
+ *
+ * @param inputs a sequence of inputs
+ * @return an enumerator over all solutions
+ */
+ public Enumerator<S> solveFor(final Iterator<S> inputs) {
+ return solveFor(startLabel, inputs);
+ }
+
+ private Enumerator<S> solveFor(final String localStartAs,
+ final Iterator<S> inputs) {
+ List<TraversalWrapper<S, S>> outs = traversalsByStartAs.get(localStartAs);
+ if (null == outs) {
+ // no out-traversals from here; just enumerate the values bound to localStartAs
+ return isAnonymousAs(localStartAs)
+ ? new SimpleEnumerator<>(localStartAs, inputs)
+ : new IteratorEnumerator<>(localStartAs, inputs);
+ } else {
+ // for each value bound to localStartAs, feed it into all out-traversals in parallel and join the results
+ return new SerialEnumerator<>(localStartAs, inputs, o -> {
+ Enumerator<S> result = null;
+ Set<String> leftLabels = new HashSet<>();
+
+ for (TraversalWrapper<S, S> w : outs) {
+ TraversalUpdater<S, S> updater
+ = new TraversalUpdater<>(w, IteratorUtils.of(o), currentStart, this.getId());
+
+ Set<String> rightLabels = new HashSet<>();
+ addVariables(w.endLabel, rightLabels);
+ Enumerator<S> ie = solveFor(w.endLabel, updater);
+ result = null == result ? ie : crossJoin(result, ie, leftLabels, rightLabels);
+ leftLabels.addAll(rightLabels);
+ }
+
+ return result;
+ });
+ }
+ }
+
+ private static <T> Enumerator<T> crossJoin(final Enumerator<T> left,
+ final Enumerator<T> right,
+ final Set<String> leftLabels,
+ final Set<String> rightLabels) {
+ Set<String> shared = new HashSet<>();
+ for (String s : rightLabels) {
+ if (leftLabels.contains(s)) {
+ shared.add(s);
+ }
+ }
+
+ Enumerator<T> cj = new CrossJoinEnumerator<>(left, right);
+ return shared.size() > 0 ? new InnerJoinEnumerator<>(cj, shared) : cj;
+ }
+
+ // recursively add all non-anonymous variables from a starting point in the query
+ private void addVariables(final String localStartAs,
+ final Set<String> variables) {
+ if (!isAnonymousAs(localStartAs)) {
+ variables.add(localStartAs);
+ }
+
+ List<TraversalWrapper<S, S>> outs = traversalsByStartAs.get(localStartAs);
+ if (null != outs) {
+ for (TraversalWrapper<S, S> w : outs) {
+ String endAs = w.endLabel;
+ if (!variables.contains(endAs)) {
+ addVariables(endAs, variables);
+ }
+ }
+ }
+ }
+
+ // applies a visitor, skipping anonymous variables
+ static <T> void visit(final String name,
+ final T value,
+ final BiConsumer<String, T> visitor) {
+ if (!isAnonymousAs(name)) {
+ visitor.accept(name, value);
+ }
+ }
+
+ /**
+ * Computes and applies a new query plan based on gathered statistics about traversal inputs and outputs.
+ */
+ // note: optimize() is never called from within a solution iterator, as it changes the query plan
+ public void optimize() {
+ optimizeAt(startLabel);
+ }
+
+ private void optimizeAt(final String outAs) {
+ List<TraversalWrapper<S, S>> outs = traversalsByStartAs.get(outAs);
+ if (null != outs) {
+ for (TraversalWrapper<S, S> t : outs) {
+ optimizeAt(t.endLabel);
+ updateOrderingFactor(t);
+ }
+ Collections.sort(outs);
+ }
+ }
+
+ private double findCost(final TraversalWrapper<S, S> root) {
+ double bf = root.findBranchFactor();
+ return bf + findCost(root.endLabel, root.findBranchFactor());
+ }
+
+ private double findCost(final String outAs,
+ final double branchFactor) {
+ double bf = branchFactor;
+
+ double cost = 0;
+
+ List<TraversalWrapper<S, S>> outs = traversalsByStartAs.get(outAs);
+ if (null != outs) {
+ for (TraversalWrapper<S, S> child : outs) {
+ cost += bf * findCost(child);
+ bf *= child.findBranchFactor();
+ }
+ }
+
+ return cost;
+ }
+
+ /**
+ * @param outLabel the out-label of one or more traversals in the query
+ * @return the expected cost, in the current query plan, of applying the branch of the query plan at
+ * the given out-label to one start value
+ */
+ public double findCost(final String outLabel) {
+ return findCost(outLabel, 1.0);
+ }
+
+ private void updateOrderingFactor(final TraversalWrapper<S, S> w) {
+ w.orderingFactor = ((w.findBranchFactor() - 1) / findCost(w));
+ }
+
+ @Override
+ public List<Traversal> getLocalChildren() {
+ return this.traversals;
+ }
+
+ /**
+ * A wrapper for a traversal in a query which maintains statistics about the traversal as
+ * it consumes inputs and produces outputs.
+ * The "branch factor" of the traversal is an important factor in determining its place in the query plan.
+ */
+ // note: input and output counts are never "refreshed".
+ // The position of a traversal in a query never changes, although its priority / likelihood of being executed does.
+ // Priority in turn affects branch factor.
+ // However, with sufficient inputs and optimizations,the branch factor is expected to converge on a stable value.
+ public static class TraversalWrapper<A, B> implements Comparable<TraversalWrapper<A, B>> {
+ private final Traversal<A, B> traversal;
+ private final String startLabel, endLabel;
+ private int totalInputs = 0;
+ private int totalOutputs = 0;
+ private double orderingFactor;
+
+ public TraversalWrapper(final Traversal<A, B> traversal,
+ final String startLabel,
+ final String endLabel) {
+ this.traversal = traversal;
+ this.startLabel = startLabel;
+ this.endLabel = endLabel;
+ }
+
+ public void incrementInputs() {
+ this.totalInputs++;
+ }
+
+ public void incrementOutputs(int outputs) {
+ this.totalOutputs += outputs;
+ }
+
+ // TODO: take variance into account, to avoid penalizing traversals for early encounters with super-inputs,
+ // or simply for never having been tried
+ public double findBranchFactor() {
+ return 0 == this.totalInputs ? 1 : this.totalOutputs / ((double) this.totalInputs);
+ }
+
+ @Override
+ public int compareTo(final TraversalWrapper<A, B> other) {
+ return ((Double) this.orderingFactor).compareTo(other.orderingFactor);
+ }
+
+ public Traversal<A, B> getTraversal() {
+ return this.traversal;
+ }
+
+ public void reset() {
+ this.traversal.asAdmin().reset();
+ }
+
+ @Override
+ public String toString() {
+ return this.traversal.toString();
+ //return "[" + this.startLabel + "->" + this.endLabel + "," + findBranchFactor() + ","
+ // + this.totalInputs + "," + this.totalOutputs + "," + this.traversal + "]";
+ }
+ }
+
+ /**
+ * A helper object which wraps a traversal, submitting starts and counting results per start
+ */
+ public static class TraversalUpdater<A, B> implements Iterator<B> {
+ private final TraversalWrapper<A, B> w;
+ private int outputs = -1;
+
+ public TraversalUpdater(final TraversalWrapper<A, B> w,
+ final Iterator<A> inputs,
+ final Traverser<A> start,
+ final String as) {
+ this.w = w;
+
+ Iterator<A> seIter = new SideEffectIterator<>(inputs, ignored -> {
+ // only increment traversal input and output counts once an input
+ // has been completely processed by the traversal
+ if (-1 != outputs) {
+ w.incrementInputs();
+ w.incrementOutputs(outputs);
+ }
+ outputs = 0;
+ });
+ Iterator<Traverser<A>> starts = new MapIterator<>(seIter,
+ o -> {
+ final Traverser.Admin<A> traverser = ((Traverser.Admin<A>) start).split();
+ traverser.set((A) o);
+ return traverser;
+ });
+
+ w.reset();
+
+ // with the traversal "empty" and ready for re-use, add new starts
+ w.traversal.asAdmin().addStarts(starts);
+ }
+
+ // note: may return true after first returning false (inheriting this behavior from e.g. DefaultTraversal)
+ @Override
+ public boolean hasNext() {
+ return w.traversal.hasNext();
+ }
+
+ @Override
+ public B next() {
+ outputs++;
+ B b = w.traversal.next();
+
+ // immediately check hasNext(), possibly updating the traverser's statistics
+ // even if we otherwise abandon the iterator
+ w.traversal.hasNext();
+
+ return b;
+ }
+ }
+
+ // an iterator which executes a side-effect the first time hasNext() is called before a next()
+ private static class SideEffectIterator<T> implements Iterator<T> {
+ private final Consumer onHasNext;
+ private final Iterator<T> baseIterator;
+ private boolean ready = true;
+
+ private SideEffectIterator(final Iterator<T> baseIterator,
+ final Consumer onHasNext) {
+ this.onHasNext = onHasNext;
+ this.baseIterator = baseIterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ if (this.ready) {
+ this.onHasNext.accept(null);
+ this.ready = false;
+ }
+ return this.baseIterator.hasNext();
+ }
+
+ @Override
+ public T next() {
+ T value = this.baseIterator.next();
+ this.ready = true;
+ return value;
+ }
+ }
+
+ private static class MapIterator<A, B> implements Iterator<B> {
+ private final Function<A, B> map;
+ private final Iterator<A> baseIterator;
+
+ public MapIterator(final Iterator<A> baseIterator, final Function<A, B> map) {
+ this.map = map;
+ this.baseIterator = baseIterator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return this.baseIterator.hasNext();
+ }
+
+ @Override
+ public B next() {
+ return this.map.apply(this.baseIterator.next());
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/SerialEnumerator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/SerialEnumerator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/SerialEnumerator.java
new file mode 100644
index 0000000..42570a3
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/SerialEnumerator.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.map.match;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/**
+ * An enumerator which consumes values from an iterator and maps each value to a secondary enumerator
+ * (for example, a join)
+ * Enumerated indices cover all solutions in the secondary enumerators,
+ * in ascending order according to the value iterator and the enumerators' own indices.
+ *
+ * @author Joshua Shinavier (http://fortytwo.net)
+ */
+public class SerialEnumerator<T> implements Enumerator<T> {
+ private final String name;
+ private Iterator<T> iterator;
+ private final Function<T, Enumerator<T>> constructor;
+ private final List<Enumerator<T>> memory = new ArrayList<>();
+ private final List<T> values = new ArrayList<>();
+
+ // TODO: this only assigned, not accessed until the efficient implementation of size() is restored
+ private int completedEnumsSize = 0;
+
+ public SerialEnumerator(final String name,
+ final Iterator<T> iterator,
+ final Function<T, Enumerator<T>> constructor) {
+ this.name = name;
+ this.iterator = iterator;
+ this.constructor = constructor;
+ }
+
+ public int size() {
+ // TODO: restore the more efficient implementation of size() while taking into account that
+ // traversal iterators such as DefaultTraversal may return hasNext=true after first returning hasNext=false
+ /*
+ int size = completedEnumsSize;
+ if (!sideEffects.isEmpty()) {
+ size += sideEffects.get(sideEffects.size() - 1).size();
+ }
+ return size;
+ */
+
+ //*
+ int size = 0;
+ for (Enumerator<T> e : memory) size += e.size();
+ return size;
+ //*/
+ }
+
+ // note: *not* intended for random access; use binary search if this is ever needed
+ public boolean visitSolution(final int index,
+ final BiConsumer<String, T> visitor) {
+ int totalSize = 0;
+ int memIndex = 0;
+ while (true) {
+ if (memIndex < memory.size()) {
+ Enumerator<T> e = memory.get(memIndex);
+
+ if (e.visitSolution(index - totalSize, visitor)) {
+ // additionally, bind the value stored in this enumerator
+ MatchStep.visit(name, values.get(memIndex), visitor);
+
+ return true;
+ } else {
+ totalSize += e.size();
+ memIndex++;
+ }
+ } else {
+ if (null == iterator) {
+ return false;
+ } else if (!iterator.hasNext()) {
+ // free up memory as soon as possible
+ iterator = null;
+ return false;
+ }
+
+ if (!memory.isEmpty()) {
+ int lastSize = memory.get(memIndex - 1).size();
+
+ // first remove the head enumeration if it exists and is empty
+ // (only the head will ever be empty, avoiding wasted space)
+ if (0 == lastSize) {
+ memIndex--;
+ memory.remove(memIndex);
+ values.remove(memIndex);
+ } else {
+ completedEnumsSize += lastSize;
+ }
+ }
+
+ T value = iterator.next();
+ values.add(value);
+ Enumerator<T> e = constructor.apply(value);
+ memory.add(memory.size(), e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/SimpleEnumerator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/SimpleEnumerator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/SimpleEnumerator.java
new file mode 100644
index 0000000..4f1dd8e
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/match/SimpleEnumerator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.map.match;
+
+import java.util.Iterator;
+import java.util.function.BiConsumer;
+
+/**
+ * An enumerator of at most one element
+ *
+ * @author Joshua Shinavier (http://fortytwo.net)
+ */
+public class SimpleEnumerator<T> implements Enumerator<T> {
+
+ private final String name;
+ private Iterator<T> iterator;
+ private T element;
+
+ public SimpleEnumerator(final String name,
+ final Iterator<T> iterator) {
+ this.name = name;
+ this.iterator = iterator;
+ }
+
+ @Override
+ public int size() {
+ return null == element ? 0 : 1;
+ }
+
+ @Override
+ public boolean visitSolution(int index, BiConsumer<String, T> visitor) {
+ if (0 != index) {
+ return false;
+ }
+
+ if (null != iterator) {
+ if (iterator.hasNext()) {
+ element = iterator.next();
+ }
+ iterator = null;
+ }
+
+ if (null != element) {
+ MatchStep.visit(name, element, visitor);
+ return true;
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AddEdgeStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AddEdgeStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AddEdgeStep.java
new file mode 100644
index 0000000..68f7049
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AddEdgeStep.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.traversal.step.Reversible;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import com.tinkerpop.gremlin.structure.Direction;
+import com.tinkerpop.gremlin.structure.Vertex;
+
+import java.util.EnumSet;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class AddEdgeStep extends SideEffectStep<Vertex> implements Reversible {
+
+ private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
+ TraverserRequirement.PATH,
+ TraverserRequirement.OBJECT
+ );
+
+ // TODO: Weight key based on Traverser.getCount() ?
+
+ private final Direction direction;
+ private final String edgeLabel;
+ private final String stepLabel;
+ private final Object[] propertyKeyValues;
+
+ public AddEdgeStep(final Traversal.Admin traversal, final Direction direction, final String edgeLabel, final String stepLabel, final Object... propertyKeyValues) {
+ super(traversal);
+ this.direction = direction;
+ this.edgeLabel = edgeLabel;
+ this.stepLabel = stepLabel;
+ this.propertyKeyValues = propertyKeyValues;
+ }
+
+ @Override
+ protected void sideEffect(final Traverser.Admin<Vertex> traverser) {
+ final Vertex currentVertex = traverser.get();
+ final Vertex otherVertex = traverser.path().get(stepLabel);
+ if (direction.equals(Direction.IN) || direction.equals(Direction.BOTH)) {
+ otherVertex.addEdge(edgeLabel, currentVertex, this.propertyKeyValues);
+ }
+ if (direction.equals(Direction.OUT) || direction.equals(Direction.BOTH)) {
+ currentVertex.addEdge(edgeLabel, otherVertex, this.propertyKeyValues);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.direction.name(), this.edgeLabel, this.stepLabel);
+ }
+
+ @Override
+ public Set<TraverserRequirement> getRequirements() {
+ return REQUIREMENTS;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AggregateStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AggregateStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AggregateStep.java
new file mode 100644
index 0000000..7b289f9
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AggregateStep.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.computer.MapReduce;
+import com.tinkerpop.gremlin.process.graph.traversal.step.SideEffectCapable;
+import com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.mapreduce.AggregateMapReduce;
+import com.tinkerpop.gremlin.process.graph.traversal.step.util.CollectingBarrierStep;
+import com.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
+import com.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import com.tinkerpop.gremlin.process.traversal.step.Reversible;
+import com.tinkerpop.gremlin.process.traversal.step.SideEffectRegistrar;
+import com.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import com.tinkerpop.gremlin.process.util.TraverserSet;
+import com.tinkerpop.gremlin.util.function.BulkSetSupplier;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class AggregateStep<S> extends CollectingBarrierStep<S> implements SideEffectRegistrar, SideEffectCapable, Reversible, TraversalParent, MapReducer<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
+
+ private Traversal.Admin<S, Object> aggregateTraversal = new IdentityTraversal<>();
+ private String sideEffectKey;
+
+ public AggregateStep(final Traversal.Admin traversal, final String sideEffectKey) {
+ super(traversal);
+ this.sideEffectKey = sideEffectKey;
+ }
+
+ @Override
+ public void registerSideEffects() {
+ if (null == this.sideEffectKey) this.sideEffectKey = this.getId();
+ this.getTraversal().asAdmin().getSideEffects().registerSupplierIfAbsent(this.sideEffectKey, BulkSetSupplier.instance());
+ }
+
+ @Override
+ public String getSideEffectKey() {
+ return this.sideEffectKey;
+ }
+
+ @Override
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.sideEffectKey, this.aggregateTraversal);
+ }
+
+ @Override
+ public MapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> getMapReduce() {
+ return new AggregateMapReduce(this);
+ }
+
+ @Override
+ public void addLocalChild(final Traversal.Admin<?, ?> traversal) {
+ this.aggregateTraversal = this.integrateChild(traversal, TYPICAL_LOCAL_OPERATIONS);
+ }
+
+ @Override
+ public List<Traversal.Admin<S, Object>> getLocalChildren() {
+ return Collections.singletonList(this.aggregateTraversal);
+ }
+
+ @Override
+ public void barrierConsumer(final TraverserSet<S> traverserSet) {
+ traverserSet.forEach(traverser ->
+ TraversalHelper.addToCollection(
+ traverser.getSideEffects().get(this.sideEffectKey),
+ TraversalUtil.apply(traverser, this.aggregateTraversal),
+ traverser.bulk()));
+ }
+
+ @Override
+ public Set<TraverserRequirement> getRequirements() {
+ return this.getSelfAndChildRequirements(TraverserRequirement.BULK, TraverserRequirement.SIDE_EFFECTS);
+ }
+
+ @Override
+ public AggregateStep<S> clone() throws CloneNotSupportedException {
+ final AggregateStep<S> clone = (AggregateStep<S>) super.clone();
+ clone.aggregateTraversal = this.integrateChild(this.aggregateTraversal.clone(), TYPICAL_LOCAL_OPERATIONS);
+ return clone;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GraphStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GraphStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GraphStep.java
new file mode 100644
index 0000000..f698f3e
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GraphStep.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.TraversalEngine;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.traversal.step.EngineDependent;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.structure.Edge;
+import com.tinkerpop.gremlin.structure.Element;
+import com.tinkerpop.gremlin.structure.Graph;
+import com.tinkerpop.gremlin.structure.Vertex;
+import com.tinkerpop.gremlin.util.iterator.EmptyIterator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class GraphStep<S extends Element> extends StartStep<S> implements EngineDependent {
+
+ protected final Class<S> returnClass;
+ protected final Object[] ids;
+ protected transient Graph graph;
+ protected transient Supplier<Iterator<S>> iteratorSupplier;
+
+ public GraphStep(final Traversal.Admin traversal, final Graph graph, final Class<S> returnClass, final Object... ids) {
+ super(traversal);
+ this.graph = graph;
+ this.returnClass = returnClass;
+ this.ids = ids;
+ this.iteratorSupplier = () -> (Iterator<S>) (Vertex.class.isAssignableFrom(this.returnClass) ?
+ this.graph.iterators().<S>vertexIterator(this.ids) :
+ this.graph.iterators().edgeIterator(this.ids));
+ }
+
+ public String toString() {
+ return TraversalHelper.makeStepString(this, Arrays.asList(this.ids), this.returnClass.getSimpleName().toLowerCase());
+ }
+
+ public boolean returnsVertices() {
+ return Vertex.class.isAssignableFrom(this.returnClass);
+ }
+
+ public boolean returnsEdges() {
+ return Edge.class.isAssignableFrom(this.returnClass);
+ }
+
+ public Class<S> getReturnClass() {
+ return this.returnClass;
+ }
+
+ public void setIteratorSupplier(final Supplier<Iterator<S>> iteratorSupplier) {
+ this.iteratorSupplier = iteratorSupplier;
+ }
+
+ public <G extends Graph> G getGraph(final Class<G> graphClass) {
+ return (G) this.graph;
+ }
+
+ public Object[] getIds() {
+ return this.ids;
+ }
+
+ @Override
+ public void onEngine(final TraversalEngine traversalEngine) {
+ if (traversalEngine.equals(TraversalEngine.COMPUTER)) {
+ this.iteratorSupplier = Collections::emptyIterator;
+ }
+ }
+
+ @Override
+ protected Traverser<S> processNextStart() {
+ if (this.first)
+ this.start = null == this.iteratorSupplier ? EmptyIterator.instance() : this.iteratorSupplier.get();
+ return super.processNextStart();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupCountStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupCountStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupCountStep.java
new file mode 100644
index 0000000..288f7c8
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupCountStep.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.computer.MapReduce;
+import com.tinkerpop.gremlin.process.graph.traversal.step.SideEffectCapable;
+import com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.mapreduce.GroupCountMapReduce;
+import com.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
+import com.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import com.tinkerpop.gremlin.process.traversal.step.Reversible;
+import com.tinkerpop.gremlin.process.traversal.step.SideEffectRegistrar;
+import com.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import com.tinkerpop.gremlin.process.util.MapHelper;
+import com.tinkerpop.gremlin.util.function.HashMapSupplier;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GroupCountStep<S> extends SideEffectStep<S> implements SideEffectRegistrar, SideEffectCapable, Reversible, TraversalParent, MapReducer<Object, Long, Object, Long, Map<Object, Long>> {
+
+ private Traversal.Admin<S, Object> groupTraversal = new IdentityTraversal<>();
+ private String sideEffectKey;
+ // TODO: onFirst like subgraph so we don't keep getting the map
+
+ public GroupCountStep(final Traversal.Admin traversal, final String sideEffectKey) {
+ super(traversal);
+ this.sideEffectKey = sideEffectKey;
+ }
+
+ @Override
+ protected void sideEffect(final Traverser.Admin<S> traverser) {
+ final Map<Object, Long> groupCountMap = traverser.sideEffects(this.sideEffectKey);
+ MapHelper.incr(groupCountMap, TraversalUtil.apply(traverser.asAdmin(), this.groupTraversal), traverser.bulk());
+ }
+
+ @Override
+ public void registerSideEffects() {
+ if (this.sideEffectKey == null) this.sideEffectKey = this.getId();
+ this.traversal.asAdmin().getSideEffects().registerSupplierIfAbsent(this.sideEffectKey, HashMapSupplier.instance());
+ }
+
+ @Override
+ public String getSideEffectKey() {
+ return this.sideEffectKey;
+ }
+
+ @Override
+ public MapReduce<Object, Long, Object, Long, Map<Object, Long>> getMapReduce() {
+ return new GroupCountMapReduce(this);
+ }
+
+ @Override
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.sideEffectKey, this.groupTraversal);
+ }
+
+ @Override
+ public void addLocalChild(final Traversal.Admin<?, ?> traversal) {
+ this.groupTraversal = this.integrateChild(traversal, TYPICAL_LOCAL_OPERATIONS);
+ }
+
+ @Override
+ public List<Traversal.Admin<S, Object>> getLocalChildren() {
+ return Collections.singletonList(this.groupTraversal);
+ }
+
+ @Override
+ public Set<TraverserRequirement> getRequirements() {
+ return this.getSelfAndChildRequirements(TraverserRequirement.BULK, TraverserRequirement.SIDE_EFFECTS);
+ }
+
+ @Override
+ public GroupCountStep<S> clone() throws CloneNotSupportedException {
+ final GroupCountStep<S> clone = (GroupCountStep<S>) super.clone();
+ clone.groupTraversal = this.integrateChild(this.groupTraversal.clone(), TYPICAL_LOCAL_OPERATIONS);
+ return clone;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupStep.java
new file mode 100644
index 0000000..dfc7071
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupStep.java
@@ -0,0 +1,156 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.TraversalEngine;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.computer.MapReduce;
+import com.tinkerpop.gremlin.process.graph.traversal.step.SideEffectCapable;
+import com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.mapreduce.GroupMapReduce;
+import com.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
+import com.tinkerpop.gremlin.process.traversal.step.EngineDependent;
+import com.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import com.tinkerpop.gremlin.process.traversal.step.Reversible;
+import com.tinkerpop.gremlin.process.traversal.step.SideEffectRegistrar;
+import com.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import com.tinkerpop.gremlin.process.util.BulkSet;
+import com.tinkerpop.gremlin.util.function.HashMapSupplier;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GroupStep<S, K, V, R> extends SideEffectStep<S> implements SideEffectCapable, SideEffectRegistrar, TraversalParent, Reversible, EngineDependent, MapReducer<Object, Collection, Object, Object, Map> {
+
+ private char state = 'k';
+ private Traversal.Admin<S, K> keyTraversal = new IdentityTraversal<>();
+ private Traversal.Admin<S, V> valueTraversal = new IdentityTraversal<>();
+ private Traversal.Admin<Collection<V>, R> reduceTraversal = null;
+ private String sideEffectKey;
+ private boolean onGraphComputer = false;
+ private Map<K, Collection<V>> tempGroupByMap;
+
+ public GroupStep(final Traversal.Admin traversal, final String sideEffectKey) {
+ super(traversal);
+ this.sideEffectKey = sideEffectKey;
+ }
+
+ @Override
+ protected void sideEffect(final Traverser.Admin<S> traverser) {
+ final Map<K, Collection<V>> groupByMap = null == this.tempGroupByMap ? traverser.sideEffects(this.sideEffectKey) : this.tempGroupByMap; // for nested traversals and not !starts.hasNext()
+ doGroup(traverser.asAdmin(), groupByMap, this.keyTraversal, this.valueTraversal);
+ if (!this.onGraphComputer && null != this.reduceTraversal && !this.starts.hasNext()) {
+ this.tempGroupByMap = groupByMap;
+ final Map<K, R> reduceMap = new HashMap<>();
+ doReduce(groupByMap, reduceMap, this.reduceTraversal);
+ traverser.sideEffects(this.sideEffectKey, reduceMap);
+ }
+ }
+
+ @Override
+ public String getSideEffectKey() {
+ return this.sideEffectKey;
+ }
+
+ @Override
+ public void registerSideEffects() {
+ if (this.sideEffectKey == null) this.sideEffectKey = this.getId();
+ this.traversal.asAdmin().getSideEffects().registerSupplierIfAbsent(this.sideEffectKey, HashMapSupplier.instance());
+ }
+
+ private static <S, K, V> void doGroup(final Traverser.Admin<S> traverser, final Map<K, Collection<V>> groupMap, final Traversal.Admin<S, K> keyTraversal, final Traversal.Admin<S, V> valueTraversal) {
+ final K key = TraversalUtil.apply(traverser, keyTraversal);
+ final V value = TraversalUtil.apply(traverser, valueTraversal);
+ Collection<V> values = groupMap.get(key);
+ if (null == values) {
+ values = new BulkSet<>();
+ groupMap.put(key, values);
+ }
+ TraversalHelper.addToCollectionUnrollIterator(values, value, traverser.bulk());
+ }
+
+ private static <K, V, R> void doReduce(final Map<K, Collection<V>> groupMap, final Map<K, R> reduceMap, final Traversal.Admin<Collection<V>, R> reduceTraversal) {
+ groupMap.forEach((k, vv) -> reduceMap.put(k, TraversalUtil.apply(vv, reduceTraversal)));
+ }
+
+ @Override
+ public void onEngine(final TraversalEngine traversalEngine) {
+ this.onGraphComputer = traversalEngine.equals(TraversalEngine.COMPUTER);
+ }
+
+ @Override
+ public MapReduce<Object, Collection, Object, Object, Map> getMapReduce() {
+ return new GroupMapReduce(this);
+ }
+
+ @Override
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.sideEffectKey, this.keyTraversal, this.valueTraversal, this.reduceTraversal);
+ }
+
+ @Override
+ public <A, B> List<Traversal.Admin<A, B>> getLocalChildren() {
+ return null == this.reduceTraversal ? (List) Arrays.asList(this.keyTraversal, this.valueTraversal) : (List) Arrays.asList(this.keyTraversal, this.valueTraversal, this.reduceTraversal);
+ }
+
+ public Traversal.Admin<Collection<V>, R> getReduceTraversal() {
+ return this.reduceTraversal;
+ }
+
+ @Override
+ public void addLocalChild(final Traversal.Admin<?, ?> kvrTraversal) {
+ if ('k' == this.state) {
+ this.keyTraversal = this.integrateChild(kvrTraversal, TYPICAL_LOCAL_OPERATIONS);
+ this.state = 'v';
+ } else if ('v' == this.state) {
+ this.valueTraversal = this.integrateChild(kvrTraversal, TYPICAL_LOCAL_OPERATIONS);
+ this.state = 'r';
+ } else if ('r' == this.state) {
+ this.reduceTraversal = this.integrateChild(kvrTraversal, TYPICAL_LOCAL_OPERATIONS);
+ this.state = 'x';
+ } else {
+ throw new IllegalStateException("The key, value, and reduce functions for group()-step have already been set");
+ }
+ }
+
+ @Override
+ public Set<TraverserRequirement> getRequirements() {
+ return this.getSelfAndChildRequirements(TraverserRequirement.SIDE_EFFECTS, TraverserRequirement.BULK);
+ }
+
+ @Override
+ public GroupStep<S, K, V, R> clone() throws CloneNotSupportedException {
+ final GroupStep<S, K, V, R> clone = (GroupStep<S, K, V, R>) super.clone();
+ clone.keyTraversal = clone.integrateChild(this.keyTraversal.clone(), TYPICAL_LOCAL_OPERATIONS);
+ clone.valueTraversal = clone.integrateChild(this.valueTraversal.clone(), TYPICAL_LOCAL_OPERATIONS);
+ if (null != this.reduceTraversal)
+ clone.reduceTraversal = clone.integrateChild(this.reduceTraversal.clone(), TYPICAL_LOCAL_OPERATIONS);
+ return clone;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/IdentityStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/IdentityStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/IdentityStep.java
new file mode 100644
index 0000000..1afa1cb
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/IdentityStep.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.traversal.step.AbstractStep;
+
+import java.util.NoSuchElementException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class IdentityStep<S> extends AbstractStep<S, S> {
+
+ public IdentityStep(final Traversal.Admin traversal) {
+ super(traversal);
+ }
+
+ @Override
+ protected Traverser<S> processNextStart() throws NoSuchElementException {
+ return this.starts.next();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/InjectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/InjectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/InjectStep.java
new file mode 100644
index 0000000..4a03159
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/InjectStep.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.util.iterator.ArrayIterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class InjectStep<S> extends StartStep<S> {
+
+ private final S[] injections;
+
+ @SafeVarargs
+ public InjectStep(final Traversal.Admin traversal, final S... injections) {
+ super(traversal);
+ this.injections = injections;
+ this.start = new ArrayIterator<>(this.injections);
+ }
+
+ @Override
+ public InjectStep<S> clone() throws CloneNotSupportedException {
+ final InjectStep<S> clone = (InjectStep<S>) super.clone();
+ clone.start = new ArrayIterator<>(clone.injections);
+ return clone;
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ this.start = new ArrayIterator<>(this.injections);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/LambdaSideEffectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/LambdaSideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/LambdaSideEffectStep.java
new file mode 100644
index 0000000..b3f5a32
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/LambdaSideEffectStep.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+
+import java.util.function.Consumer;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class LambdaSideEffectStep<S> extends SideEffectStep<S> {
+
+ private final Consumer<Traverser<S>> consumer;
+
+ public LambdaSideEffectStep(final Traversal.Admin traversal, final Consumer<Traverser<S>> consumer) {
+ super(traversal);
+ this.consumer = consumer;
+ }
+
+ @Override
+ protected void sideEffect(final Traverser.Admin<S> traverser) {
+ this.consumer.accept(traverser);
+ }
+
+ @Override
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.consumer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileStep.java
new file mode 100644
index 0000000..baee373
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileStep.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Step;
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.TraversalEngine;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.computer.MapReduce;
+import com.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import com.tinkerpop.gremlin.process.traversal.step.Reversible;
+import com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.mapreduce.ProfileMapReduce;
+import com.tinkerpop.gremlin.process.traversal.step.AbstractStep;
+import com.tinkerpop.gremlin.process.util.metric.StandardTraversalMetrics;
+import com.tinkerpop.gremlin.process.util.metric.TraversalMetrics;
+
+import java.util.NoSuchElementException;
+
+/**
+ * @author Bob Briody (http://bobbriody.com)
+ */
+public final class ProfileStep<S> extends AbstractStep<S, S> implements Reversible, MapReducer<MapReduce.NullObject, StandardTraversalMetrics, MapReduce.NullObject, StandardTraversalMetrics, StandardTraversalMetrics> {
+
+ private final String name;
+
+ public ProfileStep(final Traversal.Admin traversal) {
+ super(traversal);
+ this.name = null;
+ }
+
+ public ProfileStep(final Traversal.Admin traversal, final Step step) {
+ super(traversal);
+ this.name = step.toString();
+ }
+
+ @Override
+ public MapReduce<MapReduce.NullObject, StandardTraversalMetrics, MapReduce.NullObject, StandardTraversalMetrics, StandardTraversalMetrics> getMapReduce() {
+ return new ProfileMapReduce();
+ }
+
+ @Override
+ public Traverser<S> next() {
+ // Wrap SideEffectStep's next() with timer.
+ StandardTraversalMetrics traversalMetrics = getTraversalMetricsUtil();
+
+ Traverser<S> ret = null;
+ traversalMetrics.start(this.getId());
+ try {
+ ret = super.next();
+ return ret;
+ } finally {
+ if (ret != null)
+ traversalMetrics.finish(this.getId(), ret.asAdmin().bulk());
+ else
+ traversalMetrics.stop(this.getId());
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ // Wrap SideEffectStep's hasNext() with timer.
+ StandardTraversalMetrics traversalMetrics = getTraversalMetricsUtil();
+ traversalMetrics.start(this.getId());
+ boolean ret = super.hasNext();
+ traversalMetrics.stop(this.getId());
+ return ret;
+ }
+
+ @Override
+ protected Traverser<S> processNextStart() throws NoSuchElementException {
+ return this.starts.next();
+ }
+
+ private StandardTraversalMetrics getTraversalMetricsUtil() {
+ StandardTraversalMetrics traversalMetrics = this.getTraversal().asAdmin().getSideEffects().getOrCreate(TraversalMetrics.METRICS_KEY, StandardTraversalMetrics::new);
+ final boolean isComputer = this.traversal.asAdmin().getEngine().get().equals(TraversalEngine.COMPUTER);
+ traversalMetrics.initializeIfNecessary(this.getId(), this.traversal.asAdmin().getSteps().indexOf(this), name, isComputer);
+ return traversalMetrics;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SackElementValueStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SackElementValueStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SackElementValueStep.java
new file mode 100644
index 0000000..c259367
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SackElementValueStep.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import com.tinkerpop.gremlin.structure.Element;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.function.BinaryOperator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SackElementValueStep<S extends Element, V> extends SideEffectStep<S> {
+
+ private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
+ TraverserRequirement.SACK,
+ TraverserRequirement.OBJECT
+ );
+
+ private BinaryOperator<V> operator;
+ private final String propertyKey;
+
+ public SackElementValueStep(final Traversal.Admin traversal, final BinaryOperator<V> operator, final String propertyKey) {
+ super(traversal);
+ this.operator = operator;
+ this.propertyKey = propertyKey;
+ }
+
+ @Override
+ protected void sideEffect(final Traverser.Admin<S> traverser) {
+ traverser.get().iterators().valueIterator(this.propertyKey).forEachRemaining(value -> {
+ traverser.sack(this.operator.apply(traverser.sack(), (V) value));
+ });
+ }
+
+ @Override
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.operator, this.propertyKey);
+ }
+
+ @Override
+ public Set<TraverserRequirement> getRequirements() {
+ return REQUIREMENTS;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SackObjectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SackObjectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SackObjectStep.java
new file mode 100644
index 0000000..1f664a8
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SackObjectStep.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SackObjectStep<S, V> extends SideEffectStep<S> {
+
+ private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
+ TraverserRequirement.SACK,
+ TraverserRequirement.OBJECT
+ );
+
+ private final BiFunction<V, S, V> operator;
+
+ public SackObjectStep(final Traversal.Admin traversal, final BiFunction<V, S, V> operator) {
+ super(traversal);
+ this.operator = operator;
+ }
+
+ @Override
+ protected void sideEffect(final Traverser.Admin<S> traverser) {
+ traverser.sack(this.operator.apply(traverser.sack(), traverser.get()));
+ }
+
+ @Override
+ public Set<TraverserRequirement> getRequirements() {
+ return REQUIREMENTS;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SideEffectCapStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SideEffectCapStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SideEffectCapStep.java
new file mode 100644
index 0000000..de42a67
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SideEffectCapStep.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.graph.traversal.step.SideEffectCapable;
+import com.tinkerpop.gremlin.process.graph.traversal.step.util.SupplyingBarrierStep;
+import com.tinkerpop.gremlin.process.traversal.step.SideEffectRegistrar;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+
+import java.util.*;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SideEffectCapStep<S, E> extends SupplyingBarrierStep<S, E> implements SideEffectRegistrar {
+
+ private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
+ TraverserRequirement.SIDE_EFFECTS,
+ TraverserRequirement.OBJECT
+ );
+
+ private List<String> sideEffectKeys;
+
+ public SideEffectCapStep(final Traversal.Admin traversal, final String... sideEffectKeys) {
+ super(traversal);
+ this.sideEffectKeys = Arrays.asList(sideEffectKeys);
+ }
+
+ public void registerSideEffects() {
+ if (this.sideEffectKeys.isEmpty())
+ this.sideEffectKeys = Collections.singletonList(((SideEffectCapable) this.getPreviousStep()).getSideEffectKey());
+ }
+
+ @Override
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.sideEffectKeys);
+ }
+
+ public List<String> getSideEffectKeys() {
+ return this.sideEffectKeys;
+ }
+
+ @Override
+ public Set<TraverserRequirement> getRequirements() {
+ return REQUIREMENTS;
+ }
+
+ @Override
+ public E supply() {
+ return this.sideEffectKeys.size() == 1 ?
+ this.getTraversal().asAdmin().getSideEffects().get(this.sideEffectKeys.get(0)) :
+ (E) this.getMapOfSideEffects();
+ }
+
+ public Map<String, Object> getMapOfSideEffects() {
+ final Map<String, Object> sideEffects = new HashMap<>();
+ for (final String sideEffectKey : this.sideEffectKeys) {
+ sideEffects.put(sideEffectKey, this.getTraversal().asAdmin().getSideEffects().get(sideEffectKey));
+ }
+ return sideEffects;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SideEffectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SideEffectStep.java
new file mode 100644
index 0000000..7bc47b3
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/SideEffectStep.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.traversal.step.AbstractStep;
+import com.tinkerpop.gremlin.process.traversal.step.Reversible;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public abstract class SideEffectStep<S> extends AbstractStep<S, S> implements Reversible {
+
+ public SideEffectStep(final Traversal.Admin traversal) {
+ super(traversal);
+ }
+
+ protected abstract void sideEffect(final Traverser.Admin<S> traverser);
+
+ @Override
+ protected Traverser<S> processNextStart() {
+ final Traverser.Admin<S> traverser = this.starts.next();
+ this.sideEffect(traverser);
+ return traverser;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StartStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StartStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StartStep.java
new file mode 100644
index 0000000..c9bc624
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StartStep.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.traversal.step.Reversible;
+import com.tinkerpop.gremlin.process.traversal.step.AbstractStep;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+
+import java.util.Iterator;
+import java.util.stream.Stream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class StartStep<S> extends AbstractStep<S, S> implements Reversible {
+
+ protected Object start;
+ protected boolean first = true;
+
+ public StartStep(final Traversal.Admin traversal, final Object start) {
+ super(traversal);
+ this.start = start;
+ }
+
+ public StartStep(final Traversal.Admin traversal) {
+ this(traversal, null);
+ }
+
+ public <T> T getStart() {
+ return (T) this.start;
+ }
+
+ public boolean startAssignableTo(final Class... assignableClasses) {
+ return Stream.of(assignableClasses).filter(check -> check.isAssignableFrom(this.start.getClass())).findAny().isPresent();
+ }
+
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.start);
+ }
+
+ @Override
+ protected Traverser<S> processNextStart() {
+ if (this.first) {
+ if (null != this.start) {
+ if (this.start instanceof Iterator)
+ this.starts.add(this.getTraversal().getTraverserGenerator().generateIterator((Iterator<S>) this.start, this, 1l));
+ else
+ this.starts.add(this.getTraversal().getTraverserGenerator().generate((S) this.start, this, 1l));
+ }
+ this.first = false;
+ }
+ return this.starts.next();
+ }
+
+ @Override
+ public StartStep<S> clone() throws CloneNotSupportedException {
+ final StartStep<S> clone = (StartStep<S>) super.clone();
+ clone.first = true;
+ clone.start = null;
+ return clone;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/1545201f/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StoreStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StoreStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StoreStep.java
new file mode 100644
index 0000000..e10be8a
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StoreStep.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+
+import com.tinkerpop.gremlin.process.Traversal;
+import com.tinkerpop.gremlin.process.Traverser;
+import com.tinkerpop.gremlin.process.computer.MapReduce;
+import com.tinkerpop.gremlin.process.graph.traversal.step.SideEffectCapable;
+import com.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.mapreduce.StoreMapReduce;
+import com.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
+import com.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import com.tinkerpop.gremlin.process.traversal.step.Reversible;
+import com.tinkerpop.gremlin.process.traversal.step.SideEffectRegistrar;
+import com.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import com.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import com.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import com.tinkerpop.gremlin.util.function.BulkSetSupplier;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class StoreStep<S> extends SideEffectStep<S> implements SideEffectCapable, SideEffectRegistrar, Reversible, TraversalParent, MapReducer<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
+
+ private Traversal.Admin<S, Object> storeTraversal = new IdentityTraversal<>();
+ private String sideEffectKey;
+
+ public StoreStep(final Traversal.Admin traversal, final String sideEffectKey) {
+ super(traversal);
+ this.sideEffectKey = sideEffectKey;
+ }
+
+ @Override
+ protected void sideEffect(final Traverser.Admin<S> traverser) {
+ TraversalHelper.addToCollection(
+ traverser.sideEffects(this.sideEffectKey),
+ TraversalUtil.apply(traverser.asAdmin(), this.storeTraversal),
+ traverser.bulk());
+ }
+
+ @Override
+ public void registerSideEffects() {
+ if (null == this.sideEffectKey) this.sideEffectKey = this.getId();
+ this.traversal.asAdmin().getSideEffects().registerSupplierIfAbsent(this.sideEffectKey, BulkSetSupplier.instance());
+ }
+
+ @Override
+ public String getSideEffectKey() {
+ return this.sideEffectKey;
+ }
+
+ @Override
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.sideEffectKey, this.storeTraversal);
+ }
+
+ @Override
+ public MapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> getMapReduce() {
+ return new StoreMapReduce(this);
+ }
+
+ @Override
+ public List<Traversal.Admin<S, Object>> getLocalChildren() {
+ return Collections.singletonList(this.storeTraversal);
+ }
+
+ @Override
+ public void addLocalChild(final Traversal.Admin<?, ?> storeTraversal) {
+ this.storeTraversal = this.integrateChild(storeTraversal, TYPICAL_LOCAL_OPERATIONS);
+ }
+
+ @Override
+ public Set<TraverserRequirement> getRequirements() {
+ return this.getSelfAndChildRequirements(TraverserRequirement.SIDE_EFFECTS, TraverserRequirement.BULK);
+ }
+
+ @Override
+ public StoreStep<S> clone() throws CloneNotSupportedException {
+ final StoreStep<S> clone = (StoreStep<S>) super.clone();
+ clone.storeTraversal = clone.integrateChild(this.storeTraversal.clone(), TYPICAL_LOCAL_OPERATIONS);
+ return clone;
+ }
+}