You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/19 19:26:43 UTC
[25/40] incubator-tinkerpop git commit: the traversal steps provided
by TinkerPop are the foundation for all dsl. GraphTraversal is just a dsl of
traversal. Refactored the process API to reflect this concept. Fixed #592.
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/match/SerialEnumerator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/match/SerialEnumerator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/match/SerialEnumerator.java
new file mode 100644
index 0000000..e4f8bef
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/match/SerialEnumerator.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.map.match;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.function.BiConsumer;
+import java.util.function.Function;
+
+/**
+ * An enumerator which consumes values from an iterator and maps each value to a secondary enumerator
+ * (for example, a join)
+ * Enumerated indices cover all solutions in the secondary enumerators,
+ * in ascending order according to the value iterator and the enumerators' own indices.
+ *
+ * @author Joshua Shinavier (http://fortytwo.net)
+ */
+public class SerialEnumerator<T> implements Enumerator<T> {
+ private final String name;
+ private Iterator<T> iterator;
+ private final Function<T, Enumerator<T>> constructor;
+ private final List<Enumerator<T>> memory = new ArrayList<>();
+ private final List<T> values = new ArrayList<>();
+
+ // TODO: this only assigned, not accessed until the efficient implementation of size() is restored
+ private int completedEnumsSize = 0;
+
+ public SerialEnumerator(final String name,
+ final Iterator<T> iterator,
+ final Function<T, Enumerator<T>> constructor) {
+ this.name = name;
+ this.iterator = iterator;
+ this.constructor = constructor;
+ }
+
+ public int size() {
+ // TODO: restore the more efficient implementation of size() while taking into account that
+ // traversal iterators such as DefaultTraversal may return hasNext=true after first returning hasNext=false
+ /*
+ int size = completedEnumsSize;
+ if (!sideEffects.isEmpty()) {
+ size += sideEffects.get(sideEffects.size() - 1).size();
+ }
+ return size;
+ */
+
+ //*
+ int size = 0;
+ for (Enumerator<T> e : memory) size += e.size();
+ return size;
+ //*/
+ }
+
+ // note: *not* intended for random access; use binary search if this is ever needed
+ public boolean visitSolution(final int index,
+ final BiConsumer<String, T> visitor) {
+ int totalSize = 0;
+ int memIndex = 0;
+ while (true) {
+ if (memIndex < memory.size()) {
+ Enumerator<T> e = memory.get(memIndex);
+
+ if (e.visitSolution(index - totalSize, visitor)) {
+ // additionally, bind the value stored in this enumerator
+ MatchStep.visit(name, values.get(memIndex), visitor);
+
+ return true;
+ } else {
+ totalSize += e.size();
+ memIndex++;
+ }
+ } else {
+ if (null == iterator) {
+ return false;
+ } else if (!iterator.hasNext()) {
+ // free up memory as soon as possible
+ iterator = null;
+ return false;
+ }
+
+ if (!memory.isEmpty()) {
+ int lastSize = memory.get(memIndex - 1).size();
+
+ // first remove the head enumeration if it exists and is empty
+ // (only the head will ever be empty, avoiding wasted space)
+ if (0 == lastSize) {
+ memIndex--;
+ memory.remove(memIndex);
+ values.remove(memIndex);
+ } else {
+ completedEnumsSize += lastSize;
+ }
+ }
+
+ T value = iterator.next();
+ values.add(value);
+ Enumerator<T> e = constructor.apply(value);
+ memory.add(memory.size(), e);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/match/SimpleEnumerator.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/match/SimpleEnumerator.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/match/SimpleEnumerator.java
new file mode 100644
index 0000000..eb5da9c
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/match/SimpleEnumerator.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.map.match;
+
+import java.util.Iterator;
+import java.util.function.BiConsumer;
+
+/**
+ * An enumerator of at most one element
+ *
+ * @author Joshua Shinavier (http://fortytwo.net)
+ */
+public class SimpleEnumerator<T> implements Enumerator<T> {
+
+ private final String name;
+ private Iterator<T> iterator;
+ private T element;
+
+ public SimpleEnumerator(final String name,
+ final Iterator<T> iterator) {
+ this.name = name;
+ this.iterator = iterator;
+ }
+
+ @Override
+ public int size() {
+ return null == element ? 0 : 1;
+ }
+
+ @Override
+ public boolean visitSolution(int index, BiConsumer<String, T> visitor) {
+ if (0 != index) {
+ return false;
+ }
+
+ if (null != iterator) {
+ if (iterator.hasNext()) {
+ element = iterator.next();
+ }
+ iterator = null;
+ }
+
+ if (null != element) {
+ MatchStep.visit(name, element, visitor);
+ return true;
+ }
+
+ return false;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/AddPropertyStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/AddPropertyStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/AddPropertyStep.java
new file mode 100644
index 0000000..f79ed92
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/AddPropertyStep.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.structure.Element;
+
+import java.util.EnumSet;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class AddPropertyStep<S extends Element> extends SideEffectStep<S> {
+
+ private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(TraverserRequirement.OBJECT);
+
+ private final String key;
+ private final Object value;
+
+ public AddPropertyStep(final Traversal.Admin traversal, final String key, final Object value) {
+ super(traversal);
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ protected void sideEffect(final Traverser.Admin<S> traverser) {
+ traverser.get().property(this.key, this.value);
+ }
+
+ @Override
+ public Set<TraverserRequirement> getRequirements() {
+ return REQUIREMENTS;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/AggregateStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/AggregateStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/AggregateStep.java
new file mode 100644
index 0000000..502037e
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/AggregateStep.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.CollectingBarrierStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.function.BulkSetSupplier;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class AggregateStep<S> extends CollectingBarrierStep<S> implements SideEffectCapable, TraversalParent, MapReducer<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
+
+ private Traversal.Admin<S, Object> aggregateTraversal = null;
+ private String sideEffectKey;
+
+ public AggregateStep(final Traversal.Admin traversal, final String sideEffectKey) {
+ super(traversal);
+ this.sideEffectKey = sideEffectKey;
+ this.getTraversal().asAdmin().getSideEffects().registerSupplierIfAbsent(this.sideEffectKey, BulkSetSupplier.instance());
+ }
+
+ @Override
+ public String getSideEffectKey() {
+ return this.sideEffectKey;
+ }
+
+ @Override
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.sideEffectKey, this.aggregateTraversal);
+ }
+
+ @Override
+ public MapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> getMapReduce() {
+ return new AggregateMapReduce(this);
+ }
+
+ @Override
+ public void addLocalChild(final Traversal.Admin<?, ?> aggregateTraversal) {
+ this.aggregateTraversal = this.integrateChild(aggregateTraversal);
+ }
+
+ @Override
+ public List<Traversal.Admin<S, Object>> getLocalChildren() {
+ return null == this.aggregateTraversal ? Collections.emptyList() : Collections.singletonList(this.aggregateTraversal);
+ }
+
+ @Override
+ public void barrierConsumer(final TraverserSet<S> traverserSet) {
+ traverserSet.forEach(traverser ->
+ TraversalHelper.addToCollection(
+ traverser.getSideEffects().get(this.sideEffectKey),
+ TraversalUtil.applyNullable(traverser, this.aggregateTraversal),
+ traverser.bulk()));
+ }
+
+ @Override
+ public Set<TraverserRequirement> getRequirements() {
+ return this.getSelfAndChildRequirements(TraverserRequirement.BULK, TraverserRequirement.SIDE_EFFECTS);
+ }
+
+ @Override
+ public AggregateStep<S> clone() {
+ final AggregateStep<S> clone = (AggregateStep<S>) super.clone();
+ if (null != this.aggregateTraversal)
+ clone.aggregateTraversal = clone.integrateChild(this.aggregateTraversal.clone());
+ return clone;
+ }
+
+ ////////
+
+ public static final class AggregateMapReduce extends StaticMapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
+
+ public static final String AGGREGATE_STEP_SIDE_EFFECT_KEY = "gremlin.aggregateStep.sideEffectKey";
+
+ private String sideEffectKey;
+ private Supplier<Collection> collectionSupplier;
+
+ private AggregateMapReduce() {
+
+ }
+
+ public AggregateMapReduce(final AggregateStep step) {
+ this.sideEffectKey = step.getSideEffectKey();
+ this.collectionSupplier = step.getTraversal().asAdmin().getSideEffects().<Collection>getRegisteredSupplier(this.sideEffectKey).orElse(BulkSet::new);
+ }
+
+ @Override
+ public void storeState(final Configuration configuration) {
+ super.storeState(configuration);
+ configuration.setProperty(AGGREGATE_STEP_SIDE_EFFECT_KEY, this.sideEffectKey);
+ }
+
+ @Override
+ public void loadState(final Configuration configuration) {
+ this.sideEffectKey = configuration.getString(AGGREGATE_STEP_SIDE_EFFECT_KEY);
+ this.collectionSupplier = TraversalVertexProgram.getTraversalSupplier(configuration).get().getSideEffects().<Collection>getRegisteredSupplier(this.sideEffectKey).orElse(BulkSet::new);
+ }
+
+ @Override
+ public boolean doStage(final Stage stage) {
+ return stage.equals(Stage.MAP);
+ }
+
+ @Override
+ public void map(final Vertex vertex, final MapEmitter<NullObject, Object> emitter) {
+ VertexTraversalSideEffects.of(vertex).<Collection<?>>orElse(this.sideEffectKey, Collections.emptyList()).forEach(emitter::emit);
+ }
+
+ @Override
+ public Collection generateFinalResult(final Iterator<KeyValue<NullObject, Object>> keyValues) {
+ final Collection collection = this.collectionSupplier.get();
+ keyValues.forEachRemaining(keyValue -> collection.add(keyValue.getValue()));
+ return collection;
+ }
+
+ @Override
+ public String getMemoryKey() {
+ return this.sideEffectKey;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GraphStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GraphStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GraphStep.java
new file mode 100644
index 0000000..1b6b514
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GraphStep.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalEngine;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.EngineDependent;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Element;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.iterator.EmptyIterator;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.function.Supplier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class GraphStep<S extends Element> extends StartStep<S> implements EngineDependent {
+
+ protected final Class<S> returnClass;
+ protected Object[] ids;
+ protected transient Supplier<Iterator<S>> iteratorSupplier;
+
+ public GraphStep(final Traversal.Admin traversal, final Class<S> returnClass, final Object... ids) {
+ super(traversal);
+ this.returnClass = returnClass;
+ this.ids = ids;
+ for (int i = 0; i < this.ids.length; i++) {
+ if (this.ids[i] instanceof Element)
+ this.ids[i] = ((Element) this.ids[i]).id();
+ }
+ this.iteratorSupplier = () -> (Iterator<S>) (Vertex.class.isAssignableFrom(this.returnClass) ?
+ this.getTraversal().getGraph().get().vertices(this.ids) :
+ this.getTraversal().getGraph().get().edges(this.ids));
+ }
+
+ public String toString() {
+ return TraversalHelper.makeStepString(this, Arrays.toString(this.ids), this.returnClass.getSimpleName().toLowerCase());
+ }
+
+ public boolean returnsVertices() {
+ return Vertex.class.isAssignableFrom(this.returnClass);
+ }
+
+ public boolean returnsEdges() {
+ return Edge.class.isAssignableFrom(this.returnClass);
+ }
+
+ public Class<S> getReturnClass() {
+ return this.returnClass;
+ }
+
+ public void setIteratorSupplier(final Supplier<Iterator<S>> iteratorSupplier) {
+ this.iteratorSupplier = iteratorSupplier;
+ }
+
+ public Object[] getIds() {
+ return this.ids;
+ }
+
+ public void clearIds() {
+ this.ids = new Object[0];
+ }
+
+ @Override
+ public void onEngine(final TraversalEngine traversalEngine) {
+ if (traversalEngine.isComputer()) {
+ this.iteratorSupplier = Collections::emptyIterator;
+ }
+ }
+
+ @Override
+ protected Traverser<S> processNextStart() {
+ if (this.first)
+ this.start = null == this.iteratorSupplier ? EmptyIterator.instance() : this.iteratorSupplier.get();
+ return super.processNextStart();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountSideEffectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountSideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountSideEffectStep.java
new file mode 100644
index 0000000..5a80c4f
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupCountSideEffectStep.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
+import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.MapHelper;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GroupCountSideEffectStep<S, E> extends SideEffectStep<S> implements SideEffectCapable, TraversalParent, MapReducer<E, Long, E, Long, Map<E, Long>> {
+
+ private Traversal.Admin<S, E> groupTraversal = null;
+ private String sideEffectKey;
+
+ public GroupCountSideEffectStep(final Traversal.Admin traversal, final String sideEffectKey) {
+ super(traversal);
+ this.sideEffectKey = sideEffectKey;
+ this.traversal.asAdmin().getSideEffects().registerSupplierIfAbsent(this.sideEffectKey, HashMapSupplier.instance());
+ }
+
+ @Override
+ protected void sideEffect(final Traverser.Admin<S> traverser) {
+ final Map<Object, Long> groupCountMap = traverser.sideEffects(this.sideEffectKey);
+ MapHelper.incr(groupCountMap, TraversalUtil.applyNullable(traverser.asAdmin(), this.groupTraversal), traverser.bulk());
+ }
+
+ @Override
+ public String getSideEffectKey() {
+ return this.sideEffectKey;
+ }
+
+ @Override
+ public MapReduce<E, Long, E, Long, Map<E, Long>> getMapReduce() {
+ return new GroupCountSideEffectMapReduce<>(this);
+ }
+
+ @Override
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.sideEffectKey, this.groupTraversal);
+ }
+
+ @Override
+ public void addLocalChild(final Traversal.Admin<?, ?> groupTraversal) {
+ this.groupTraversal = this.integrateChild(groupTraversal);
+ }
+
+ @Override
+ public List<Traversal.Admin<S, E>> getLocalChildren() {
+ return null == this.groupTraversal ? Collections.emptyList() : Collections.singletonList(this.groupTraversal);
+ }
+
+ @Override
+ public Set<TraverserRequirement> getRequirements() {
+ return this.getSelfAndChildRequirements(TraverserRequirement.BULK, TraverserRequirement.SIDE_EFFECTS);
+ }
+
+ @Override
+ public GroupCountSideEffectStep<S, E> clone() {
+ final GroupCountSideEffectStep<S, E> clone = (GroupCountSideEffectStep<S, E>) super.clone();
+ if (null != this.groupTraversal)
+ clone.groupTraversal = clone.integrateChild(this.groupTraversal.clone());
+ return clone;
+ }
+
+ ///////
+
+ public static final class GroupCountSideEffectMapReduce<E> extends StaticMapReduce<E, Long, E, Long, Map<E, Long>> {
+
+ public static final String GROUP_COUNT_SIDE_EFFECT_STEP_SIDE_EFFECT_KEY = "gremlin.groupCountSideEffectStep.sideEffectKey";
+
+ private String sideEffectKey;
+ private Supplier<Map<E, Long>> mapSupplier;
+
+ private GroupCountSideEffectMapReduce() {
+
+ }
+
+ public GroupCountSideEffectMapReduce(final GroupCountSideEffectStep step) {
+ this.sideEffectKey = step.getSideEffectKey();
+ this.mapSupplier = step.getTraversal().asAdmin().getSideEffects().<Map<E, Long>>getRegisteredSupplier(this.sideEffectKey).orElse(HashMap::new);
+ }
+
+ @Override
+ public void storeState(final Configuration configuration) {
+ super.storeState(configuration);
+ configuration.setProperty(GROUP_COUNT_SIDE_EFFECT_STEP_SIDE_EFFECT_KEY, this.sideEffectKey);
+ }
+
+ @Override
+ public void loadState(final Configuration configuration) {
+ this.sideEffectKey = configuration.getString(GROUP_COUNT_SIDE_EFFECT_STEP_SIDE_EFFECT_KEY);
+ this.mapSupplier = TraversalVertexProgram.getTraversalSupplier(configuration).get().getSideEffects().<Map<E, Long>>getRegisteredSupplier(this.sideEffectKey).orElse(HashMap::new);
+ }
+
+ @Override
+ public boolean doStage(final Stage stage) {
+ return true;
+ }
+
+ @Override
+ public void map(final Vertex vertex, final MapEmitter<E, Long> emitter) {
+ VertexTraversalSideEffects.of(vertex).<Map<E, Number>>orElse(this.sideEffectKey, Collections.emptyMap()).forEach((k, v) -> emitter.emit(k, v.longValue()));
+ }
+
+ @Override
+ public void reduce(final E key, final Iterator<Long> values, final ReduceEmitter<E, Long> emitter) {
+ long counter = 0;
+ while (values.hasNext()) {
+ counter = counter + values.next();
+ }
+ emitter.emit(key, counter);
+ }
+
+ @Override
+ public void combine(final E key, final Iterator<Long> values, final ReduceEmitter<E, Long> emitter) {
+ reduce(key, values, emitter);
+ }
+
+ @Override
+ public Map<E, Long> generateFinalResult(final Iterator<KeyValue<E, Long>> keyValues) {
+ final Map<E, Long> map = this.mapSupplier.get();
+ keyValues.forEachRemaining(keyValue -> map.put(keyValue.getKey(), keyValue.getValue()));
+ return map;
+ }
+
+ @Override
+ public String getMemoryKey() {
+ return this.sideEffectKey;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
new file mode 100644
index 0000000..d344953
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/GroupSideEffectStep.java
@@ -0,0 +1,248 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.TraversalEngine;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
+import org.apache.tinkerpop.gremlin.process.traversal.step.EngineDependent;
+import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
+import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class GroupSideEffectStep<S, K, V, R> extends SideEffectStep<S> implements SideEffectCapable, TraversalParent, EngineDependent, MapReducer<K, Collection<V>, K, R, Map<K, R>> {
+
+ private char state = 'k';
+ private Traversal.Admin<S, K> keyTraversal = null;
+ private Traversal.Admin<S, V> valueTraversal = null;
+ private Traversal.Admin<Collection<V>, R> reduceTraversal = null;
+ private String sideEffectKey;
+ private boolean onGraphComputer = false;
+ private Map<K, Collection<V>> tempGroupByMap;
+
+ public GroupSideEffectStep(final Traversal.Admin traversal, final String sideEffectKey) {
+ super(traversal);
+ this.sideEffectKey = sideEffectKey;
+ this.traversal.asAdmin().getSideEffects().registerSupplierIfAbsent(this.sideEffectKey, HashMapSupplier.instance());
+ }
+
+ @Override
+ protected void sideEffect(final Traverser.Admin<S> traverser) {
+ final Map<K, Collection<V>> groupMap = null == this.tempGroupByMap ? traverser.sideEffects(this.sideEffectKey) : this.tempGroupByMap; // for nested traversals and not !starts.hasNext()
+ final K key = TraversalUtil.applyNullable(traverser, keyTraversal);
+ final V value = TraversalUtil.applyNullable(traverser, valueTraversal);
+ Collection<V> values = groupMap.get(key);
+ if (null == values) {
+ values = new BulkSet<>();
+ groupMap.put(key, values);
+ }
+ TraversalHelper.addToCollectionUnrollIterator(values, value, traverser.bulk());
+ //////// reducer for OLTP
+ if (!this.onGraphComputer && null != this.reduceTraversal && !this.starts.hasNext()) {
+ this.tempGroupByMap = groupMap;
+ final Map<K, R> reduceMap = new HashMap<>();
+ groupMap.forEach((k, vv) -> reduceMap.put(k, TraversalUtil.applyNullable(vv, this.reduceTraversal)));
+ traverser.sideEffects(this.sideEffectKey, reduceMap);
+ }
+ }
+
+ @Override
+ public String getSideEffectKey() {
+ return this.sideEffectKey;
+ }
+
+ @Override
+ public void onEngine(final TraversalEngine traversalEngine) {
+ this.onGraphComputer = traversalEngine.isComputer();
+ }
+
+ @Override
+ public MapReduce<K, Collection<V>, K, R, Map<K, R>> getMapReduce() {
+ return new GroupSideEffectMapReduce<>(this);
+ }
+
+ @Override
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.sideEffectKey, this.keyTraversal, this.valueTraversal, this.reduceTraversal);
+ }
+
+ @Override
+ public <A, B> List<Traversal.Admin<A, B>> getLocalChildren() {
+ final List<Traversal.Admin<A, B>> children = new ArrayList<>(3);
+ if (null != this.keyTraversal)
+ children.add((Traversal.Admin) this.keyTraversal);
+ if (null != this.valueTraversal)
+ children.add((Traversal.Admin) this.valueTraversal);
+ if (null != this.reduceTraversal)
+ children.add((Traversal.Admin) this.reduceTraversal);
+ return children;
+ }
+
+ public Traversal.Admin<Collection<V>, R> getReduceTraversal() {
+ return this.reduceTraversal;
+ }
+
+ @Override
+ public void addLocalChild(final Traversal.Admin<?, ?> kvrTraversal) {
+ if ('k' == this.state) {
+ this.keyTraversal = this.integrateChild(kvrTraversal);
+ this.state = 'v';
+ } else if ('v' == this.state) {
+ this.valueTraversal = this.integrateChild(kvrTraversal);
+ this.state = 'r';
+ } else if ('r' == this.state) {
+ this.reduceTraversal = this.integrateChild(kvrTraversal);
+ this.state = 'x';
+ } else {
+ throw new IllegalStateException("The key, value, and reduce functions for group()-step have already been set");
+ }
+ }
+
+ @Override
+ public Set<TraverserRequirement> getRequirements() {
+ return this.getSelfAndChildRequirements(TraverserRequirement.SIDE_EFFECTS, TraverserRequirement.BULK);
+ }
+
+ @Override
+ public GroupSideEffectStep<S, K, V, R> clone() {
+ final GroupSideEffectStep<S, K, V, R> clone = (GroupSideEffectStep<S, K, V, R>) super.clone();
+ if (null != this.keyTraversal)
+ clone.keyTraversal = clone.integrateChild(this.keyTraversal.clone());
+ if (null != this.valueTraversal)
+ clone.valueTraversal = clone.integrateChild(this.valueTraversal.clone());
+ if (null != this.reduceTraversal)
+ clone.reduceTraversal = clone.integrateChild(this.reduceTraversal.clone());
+ return clone;
+ }
+
+ ///////////
+
+ public static final class GroupSideEffectMapReduce<K, V, R> implements MapReduce<K, Collection<V>, K, R, Map<K, R>> {
+
+ public static final String GROUP_SIDE_EFFECT_STEP_SIDE_EFFECT_KEY = "gremlin.groupSideEffectStep.sideEffectKey";
+ public static final String GROUP_SIDE_EFFECT_STEP_STEP_ID = "gremlin.groupSideEffectStep.stepId";
+
+ private String sideEffectKey;
+ private String groupStepId;
+ private Traversal.Admin<Collection<V>, R> reduceTraversal;
+ private Supplier<Map<K, R>> mapSupplier;
+
+ private GroupSideEffectMapReduce() {
+
+ }
+
+ public GroupSideEffectMapReduce(final GroupSideEffectStep step) {
+ this.groupStepId = step.getId();
+ this.sideEffectKey = step.getSideEffectKey();
+ this.reduceTraversal = step.getReduceTraversal();
+ this.mapSupplier = step.getTraversal().asAdmin().getSideEffects().<Map<K, R>>getRegisteredSupplier(this.sideEffectKey).orElse(HashMap::new);
+ }
+
+ @Override
+ public void storeState(final Configuration configuration) {
+ MapReduce.super.storeState(configuration);
+ configuration.setProperty(GROUP_SIDE_EFFECT_STEP_SIDE_EFFECT_KEY, this.sideEffectKey);
+ configuration.setProperty(GROUP_SIDE_EFFECT_STEP_STEP_ID, this.groupStepId);
+ }
+
+ @Override
+ public void loadState(final Configuration configuration) {
+ this.sideEffectKey = configuration.getString(GROUP_SIDE_EFFECT_STEP_SIDE_EFFECT_KEY);
+ this.groupStepId = configuration.getString(GROUP_SIDE_EFFECT_STEP_STEP_ID);
+ final Traversal.Admin<?, ?> traversal = TraversalVertexProgram.getTraversalSupplier(configuration).get();
+ if (!traversal.isLocked())
+ traversal.applyStrategies(); // TODO: this is a scary error prone requirement, but only a problem for GroupStep
+ final GroupSideEffectStep groupSideEffectStep = new TraversalMatrix<>(traversal).getStepById(this.groupStepId);
+ this.reduceTraversal = groupSideEffectStep.getReduceTraversal();
+ this.mapSupplier = traversal.getSideEffects().<Map<K, R>>getRegisteredSupplier(this.sideEffectKey).orElse(HashMap::new);
+ }
+
+ @Override
+ public boolean doStage(final Stage stage) {
+ return !stage.equals(Stage.COMBINE);
+ }
+
+ @Override
+ public void map(final Vertex vertex, final MapEmitter<K, Collection<V>> emitter) {
+ VertexTraversalSideEffects.of(vertex).<Map<K, Collection<V>>>orElse(this.sideEffectKey, Collections.emptyMap()).forEach(emitter::emit);
+ }
+
+ @Override
+ public void reduce(final K key, final Iterator<Collection<V>> values, final ReduceEmitter<K, R> emitter) {
+ final Set<V> set = new BulkSet<>();
+ values.forEachRemaining(set::addAll);
+ emitter.emit(key, TraversalUtil.applyNullable(set, this.reduceTraversal));
+ }
+
+ @Override
+ public Map<K, R> generateFinalResult(final Iterator<KeyValue<K, R>> keyValues) {
+ final Map<K, R> map = this.mapSupplier.get();
+ keyValues.forEachRemaining(keyValue -> map.put(keyValue.getKey(), keyValue.getValue()));
+ return map;
+ }
+
+ @Override
+ public String getMemoryKey() {
+ return this.sideEffectKey;
+ }
+
+ @Override
+ public GroupSideEffectMapReduce<K, V, R> clone() {
+ try {
+ final GroupSideEffectMapReduce<K, V, R> clone = (GroupSideEffectMapReduce<K, V, R>) super.clone();
+ if (null != clone.reduceTraversal)
+ clone.reduceTraversal = this.reduceTraversal.clone();
+ return clone;
+ } catch (final CloneNotSupportedException e) {
+ throw new IllegalStateException(e.getMessage(), e);
+ }
+ }
+
+ @Override
+ public String toString() {
+ return StringFactory.mapReduceString(this, this.getMemoryKey());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/IdentityStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/IdentityStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/IdentityStep.java
new file mode 100644
index 0000000..269f2c9
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/IdentityStep.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
+
+import java.util.NoSuchElementException;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class IdentityStep<S> extends AbstractStep<S, S> {
+
+ public IdentityStep(final Traversal.Admin traversal) {
+ super(traversal);
+ }
+
+ @Override
+ protected Traverser<S> processNextStart() throws NoSuchElementException {
+ return this.starts.next();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/InjectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/InjectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/InjectStep.java
new file mode 100644
index 0000000..34b5eab
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/InjectStep.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.util.iterator.ArrayIterator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class InjectStep<S> extends StartStep<S> {
+
+ private final S[] injections;
+
+ @SafeVarargs
+ public InjectStep(final Traversal.Admin traversal, final S... injections) {
+ super(traversal);
+ this.injections = injections;
+ this.start = new ArrayIterator<>(this.injections);
+ }
+
+ @Override
+ public InjectStep<S> clone() {
+ final InjectStep<S> clone = (InjectStep<S>) super.clone();
+ clone.start = new ArrayIterator<>(clone.injections);
+ return clone;
+ }
+
+ @Override
+ public void reset() {
+ super.reset();
+ this.start = new ArrayIterator<>(this.injections);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/LambdaSideEffectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/LambdaSideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/LambdaSideEffectStep.java
new file mode 100644
index 0000000..2050489
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/LambdaSideEffectStep.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+
+import java.util.function.Consumer;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class LambdaSideEffectStep<S> extends SideEffectStep<S> {
+
+ private final Consumer<Traverser<S>> consumer;
+
+ public LambdaSideEffectStep(final Traversal.Admin traversal, final Consumer<Traverser<S>> consumer) {
+ super(traversal);
+ this.consumer = consumer;
+ }
+
+ @Override
+ protected void sideEffect(final Traverser.Admin<S> traverser) {
+ this.consumer.accept(traverser);
+ }
+
+ @Override
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.consumer);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/ProfileStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/ProfileStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/ProfileStep.java
new file mode 100644
index 0000000..4be3483
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/ProfileStep.java
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Step;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.util.DependantMutableMetrics;
+import org.apache.tinkerpop.gremlin.process.traversal.util.MutableMetrics;
+import org.apache.tinkerpop.gremlin.process.traversal.util.StandardTraversalMetrics;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMetrics;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
+/**
+ * @author Bob Briody (http://bobbriody.com)
+ */
+public final class ProfileStep<S> extends AbstractStep<S, S> implements MapReducer<MapReduce.NullObject, StandardTraversalMetrics, MapReduce.NullObject, StandardTraversalMetrics, StandardTraversalMetrics> {
+
+ // Stored in the Traversal sideEffects but kept here as a reference for convenience.
+ private StandardTraversalMetrics traversalMetrics;
+
+ public ProfileStep(final Traversal.Admin traversal) {
+ super(traversal);
+ }
+
+
+ @Override
+ public MapReduce<MapReduce.NullObject, StandardTraversalMetrics, MapReduce.NullObject, StandardTraversalMetrics, StandardTraversalMetrics> getMapReduce() {
+ return ProfileMapReduce.instance();
+ }
+
+ @Override
+ public Traverser<S> next() {
+ Traverser<S> ret = null;
+ initializeIfNeeded();
+ traversalMetrics.start(this.getId());
+ try {
+ ret = super.next();
+ return ret;
+ } finally {
+ if (ret != null) {
+ traversalMetrics.finish(this.getId(), ret.asAdmin().bulk());
+ } else {
+ traversalMetrics.stop(this.getId());
+ }
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ initializeIfNeeded();
+ traversalMetrics.start(this.getId());
+ boolean ret = super.hasNext();
+ traversalMetrics.stop(this.getId());
+ return ret;
+ }
+
+ @Override
+ protected Traverser<S> processNextStart() throws NoSuchElementException {
+ return this.starts.next();
+ }
+
+ private void initializeIfNeeded() {
+ if (traversalMetrics != null) {
+ return;
+ }
+
+ createTraversalMetricsSideEffectIfNecessary();
+
+ // How can traversalMetrics still be null? When running on computer it may need to be re-initialized from
+ // sideEffects after serialization.
+ if (traversalMetrics == null) {
+ // look up the TraversalMetrics in the root traversal's sideEffects
+ Traversal t = this.getTraversal();
+ while (!(t.asAdmin().getParent() instanceof EmptyStep)) {
+ t = t.asAdmin().getParent().asStep().getTraversal();
+ }
+ traversalMetrics = t.asAdmin().getSideEffects().get(TraversalMetrics.METRICS_KEY);
+ }
+ }
+
+ private void createTraversalMetricsSideEffectIfNecessary() {
+ if (this.getTraversal().getSideEffects().exists(TraversalMetrics.METRICS_KEY)) {
+ // Already initialized
+ return;
+ }
+
+ if (!(this.getTraversal().getParent() instanceof EmptyStep)) {
+ // Initialization is handled at the top-level of the traversal only.
+ return;
+ }
+
+ // The following code is executed once per top-level (non-nested) Traversal for all Profile steps. (Technically,
+ // once per thread if using Computer.)
+
+ traversalMetrics = this.getTraversal().getSideEffects().getOrCreate(TraversalMetrics.METRICS_KEY, StandardTraversalMetrics::new);
+ prepTraversalForProfiling(this.getTraversal().asAdmin(), null);
+ }
+
+ // Walk the traversal steps and initialize the Metrics timers.
+ private void prepTraversalForProfiling(Traversal.Admin<?, ?> traversal, MutableMetrics parentMetrics) {
+
+ DependantMutableMetrics prevMetrics = null;
+ final List<Step> steps = traversal.getSteps();
+ for (int ii = 0; ii + 1 < steps.size(); ii = ii + 2) {
+ Step step = steps.get(ii);
+ ProfileStep profileStep = (ProfileStep) steps.get(ii + 1);
+
+ // Create metrics
+ MutableMetrics metrics;
+
+ // Computer metrics are "stand-alone" but Standard metrics handle double-counted upstream time.
+ if (traversal.getEngine().isComputer()) {
+ metrics = new MutableMetrics(step.getId(), step.toString());
+ } else {
+ metrics = new DependantMutableMetrics(step.getId(), step.toString(), prevMetrics);
+ prevMetrics = (DependantMutableMetrics) metrics;
+ }
+
+ // Initialize counters (necessary because some steps might end up being 0)
+ metrics.incrementCount(TraversalMetrics.ELEMENT_COUNT_ID, 0);
+ metrics.incrementCount(TraversalMetrics.TRAVERSER_COUNT_ID, 0);
+
+ // Add metrics to parent, if necessary
+ if (parentMetrics != null) {
+ parentMetrics.addNested(metrics);
+ }
+
+ // The TraversalMetrics sideEffect is shared across all the steps.
+ profileStep.traversalMetrics = this.traversalMetrics;
+
+ // Add root metrics to traversalMetrics
+ this.traversalMetrics.addMetrics(metrics, step.getId(), ii / 2, parentMetrics == null, profileStep.getId());
+
+ // Handle nested traversal
+ if (step instanceof TraversalParent) {
+ for (Traversal.Admin<?, ?> t : ((TraversalParent) step).getLocalChildren()) {
+ prepTraversalForProfiling(t, metrics);
+ }
+ for (Traversal.Admin<?, ?> t : ((TraversalParent) step).getGlobalChildren()) {
+ prepTraversalForProfiling(t, metrics);
+ }
+ }
+ }
+ }
+
+ //////////////////
+
+ public static final class ProfileMapReduce extends StaticMapReduce<MapReduce.NullObject, StandardTraversalMetrics, MapReduce.NullObject, StandardTraversalMetrics, StandardTraversalMetrics> {
+
+ private static ProfileMapReduce INSTANCE = new ProfileMapReduce();
+
+ private ProfileMapReduce() {
+
+ }
+
+ @Override
+ public boolean doStage(final Stage stage) {
+ return true;
+ }
+
+ @Override
+ public String getMemoryKey() {
+ return TraversalMetrics.METRICS_KEY;
+ }
+
+ @Override
+ public void map(final Vertex vertex, final MapEmitter<NullObject, StandardTraversalMetrics> emitter) {
+ VertexTraversalSideEffects.of(vertex).<StandardTraversalMetrics>ifPresent(TraversalMetrics.METRICS_KEY, emitter::emit);
+ }
+
+ @Override
+ public void combine(final NullObject key, final Iterator<StandardTraversalMetrics> values, final ReduceEmitter<NullObject, StandardTraversalMetrics> emitter) {
+ reduce(key, values, emitter);
+ }
+
+ @Override
+ public void reduce(final NullObject key, final Iterator<StandardTraversalMetrics> values, final ReduceEmitter<NullObject, StandardTraversalMetrics> emitter) {
+ emitter.emit(StandardTraversalMetrics.merge(values));
+ }
+
+ @Override
+ public StandardTraversalMetrics generateFinalResult(final Iterator<KeyValue<NullObject, StandardTraversalMetrics>> keyValues) {
+ return keyValues.next().getValue();
+ }
+
+ public static ProfileMapReduce instance() {
+ return INSTANCE;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SackElementValueStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SackElementValueStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SackElementValueStep.java
new file mode 100644
index 0000000..412164a
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SackElementValueStep.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.structure.Element;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.function.BinaryOperator;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SackElementValueStep<S extends Element, V> extends SideEffectStep<S> {
+
+ private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
+ TraverserRequirement.SACK,
+ TraverserRequirement.OBJECT
+ );
+
+ private BinaryOperator<V> operator;
+ private final String propertyKey;
+
+ public SackElementValueStep(final Traversal.Admin traversal, final BinaryOperator<V> operator, final String propertyKey) {
+ super(traversal);
+ this.operator = operator;
+ this.propertyKey = propertyKey;
+ }
+
+ @Override
+ protected void sideEffect(final Traverser.Admin<S> traverser) {
+ traverser.get().values(this.propertyKey).forEachRemaining(value -> {
+ traverser.sack(this.operator.apply(traverser.sack(), (V) value));
+ });
+ }
+
+ @Override
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.operator, this.propertyKey);
+ }
+
+ @Override
+ public Set<TraverserRequirement> getRequirements() {
+ return REQUIREMENTS;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SackObjectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SackObjectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SackObjectStep.java
new file mode 100644
index 0000000..cae570a
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SackObjectStep.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+
+import java.util.EnumSet;
+import java.util.Set;
+import java.util.function.BiFunction;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SackObjectStep<S, V> extends SideEffectStep<S> {
+
+ private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
+ TraverserRequirement.SACK,
+ TraverserRequirement.OBJECT
+ );
+
+ private final BiFunction<V, S, V> operator;
+
+ public SackObjectStep(final Traversal.Admin traversal, final BiFunction<V, S, V> operator) {
+ super(traversal);
+ this.operator = operator;
+ }
+
+ @Override
+ protected void sideEffect(final Traverser.Admin<S> traverser) {
+ traverser.sack(this.operator.apply(traverser.sack(), traverser.get()));
+ }
+
+ @Override
+ public Set<TraverserRequirement> getRequirements() {
+ return REQUIREMENTS;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SideEffectCapStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SideEffectCapStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SideEffectCapStep.java
new file mode 100644
index 0000000..28f1cf0
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SideEffectCapStep.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.SupplyingBarrierStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+
+import java.util.Arrays;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SideEffectCapStep<S, E> extends SupplyingBarrierStep<S, E> {
+
+ private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
+ TraverserRequirement.SIDE_EFFECTS,
+ TraverserRequirement.OBJECT
+ );
+
+ private List<String> sideEffectKeys;
+
+ public SideEffectCapStep(final Traversal.Admin traversal, final String... sideEffectKeys) {
+ super(traversal);
+ if (0 == sideEffectKeys.length)
+ throw new IllegalArgumentException("At least one sideEffect key must be provided to " + this.getClass().getSimpleName());
+ this.sideEffectKeys = Arrays.asList(sideEffectKeys);
+ }
+
+ @Override
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.sideEffectKeys);
+ }
+
+ public List<String> getSideEffectKeys() {
+ return this.sideEffectKeys;
+ }
+
+ @Override
+ public Set<TraverserRequirement> getRequirements() {
+ return REQUIREMENTS;
+ }
+
+ @Override
+ public E supply() {
+ return this.sideEffectKeys.size() == 1 ?
+ this.getTraversal().asAdmin().getSideEffects().get(this.sideEffectKeys.get(0)) :
+ (E) this.getMapOfSideEffects();
+ }
+
+ public Map<String, Object> getMapOfSideEffects() {
+ final Map<String, Object> sideEffects = new HashMap<>();
+ for (final String sideEffectKey : this.sideEffectKeys) {
+ sideEffects.put(sideEffectKey, this.getTraversal().asAdmin().getSideEffects().get(sideEffectKey));
+ }
+ return sideEffects;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SideEffectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SideEffectStep.java
new file mode 100644
index 0000000..2854e6e
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SideEffectStep.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public abstract class SideEffectStep<S> extends AbstractStep<S, S> {
+
+ public SideEffectStep(final Traversal.Admin traversal) {
+ super(traversal);
+ }
+
+ protected abstract void sideEffect(final Traverser.Admin<S> traverser);
+
+ @Override
+ protected Traverser<S> processNextStart() {
+ final Traverser.Admin<S> traverser = this.starts.next();
+ this.sideEffect(traverser);
+ return traverser;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StartStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StartStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StartStep.java
new file mode 100644
index 0000000..5e762fe
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StartStep.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.AbstractStep;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+
+import java.util.Iterator;
+import java.util.stream.Stream;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public class StartStep<S> extends AbstractStep<S, S> {
+
+ protected Object start;
+ protected boolean first = true;
+
+ public StartStep(final Traversal.Admin traversal, final Object start) {
+ super(traversal);
+ this.start = start;
+ }
+
+ public StartStep(final Traversal.Admin traversal) {
+ this(traversal, null);
+ }
+
+ public <T> T getStart() {
+ return (T) this.start;
+ }
+
+ public boolean startAssignableTo(final Class... assignableClasses) {
+ return Stream.of(assignableClasses).filter(check -> check.isAssignableFrom(this.start.getClass())).findAny().isPresent();
+ }
+
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.start);
+ }
+
+ @Override
+ protected Traverser<S> processNextStart() {
+ if (this.first) {
+ if (null != this.start) {
+ if (this.start instanceof Iterator)
+ this.starts.add(this.getTraversal().getTraverserGenerator().generateIterator((Iterator<S>) this.start, this, 1l));
+ else
+ this.starts.add(this.getTraversal().getTraverserGenerator().generate((S) this.start, this, 1l));
+ }
+ this.first = false;
+ }
+ return this.starts.next();
+ }
+
+ @Override
+ public StartStep<S> clone() {
+ final StartStep<S> clone = (StartStep<S>) super.clone();
+ clone.first = true;
+ clone.start = null;
+ return clone;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StoreStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StoreStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StoreStep.java
new file mode 100644
index 0000000..20c92d6
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/StoreStep.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
+import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
+import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
+import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.BulkSet;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.util.function.BulkSetSupplier;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+import java.util.function.Supplier;
+
+/**
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class StoreStep<S> extends SideEffectStep<S> implements SideEffectCapable, TraversalParent, MapReducer<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
+
+ private Traversal.Admin<S, Object> storeTraversal = null;
+ private String sideEffectKey;
+
+ public StoreStep(final Traversal.Admin traversal, final String sideEffectKey) {
+ super(traversal);
+ this.sideEffectKey = sideEffectKey;
+ this.traversal.asAdmin().getSideEffects().registerSupplierIfAbsent(this.sideEffectKey, BulkSetSupplier.instance());
+ }
+
+ @Override
+ protected void sideEffect(final Traverser.Admin<S> traverser) {
+ TraversalHelper.addToCollection(
+ traverser.sideEffects(this.sideEffectKey),
+ TraversalUtil.applyNullable(traverser.asAdmin(), this.storeTraversal),
+ traverser.bulk());
+ }
+
+ @Override
+ public String getSideEffectKey() {
+ return this.sideEffectKey;
+ }
+
+ @Override
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.sideEffectKey, this.storeTraversal);
+ }
+
+ @Override
+ public MapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> getMapReduce() {
+ return new StoreMapReduce(this);
+ }
+
+ @Override
+ public List<Traversal.Admin<S, Object>> getLocalChildren() {
+ return null == this.storeTraversal ? Collections.emptyList() : Collections.singletonList(this.storeTraversal);
+ }
+
+ @Override
+ public void addLocalChild(final Traversal.Admin<?, ?> storeTraversal) {
+ this.storeTraversal = this.integrateChild(storeTraversal);
+ }
+
+ @Override
+ public Set<TraverserRequirement> getRequirements() {
+ return this.getSelfAndChildRequirements(TraverserRequirement.SIDE_EFFECTS, TraverserRequirement.BULK);
+ }
+
+ @Override
+ public StoreStep<S> clone() {
+ final StoreStep<S> clone = (StoreStep<S>) super.clone();
+ if (null != this.storeTraversal)
+ clone.storeTraversal = clone.integrateChild(this.storeTraversal.clone());
+ return clone;
+ }
+
+ ///////////////
+
+ public static final class StoreMapReduce extends StaticMapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
+
+ public static final String STORE_STEP_SIDE_EFFECT_KEY = "gremlin.storeStep.sideEffectKey";
+
+ private String sideEffectKey;
+ private Supplier<Collection> collectionSupplier;
+
+ private StoreMapReduce() {
+
+ }
+
+ public StoreMapReduce(final StoreStep step) {
+ this.sideEffectKey = step.getSideEffectKey();
+ this.collectionSupplier = step.getTraversal().asAdmin().getSideEffects().<Collection>getRegisteredSupplier(this.sideEffectKey).orElse(BulkSet::new);
+ }
+
+ @Override
+ public void storeState(final Configuration configuration) {
+ super.storeState(configuration);
+ configuration.setProperty(STORE_STEP_SIDE_EFFECT_KEY, this.sideEffectKey);
+ }
+
+ @Override
+ public void loadState(final Configuration configuration) {
+ this.sideEffectKey = configuration.getString(STORE_STEP_SIDE_EFFECT_KEY);
+ this.collectionSupplier = TraversalVertexProgram.getTraversalSupplier(configuration).get().getSideEffects().<Collection>getRegisteredSupplier(this.sideEffectKey).orElse(BulkSet::new);
+ }
+
+ @Override
+ public boolean doStage(final Stage stage) {
+ return stage.equals(Stage.MAP);
+ }
+
+ @Override
+ public void map(final Vertex vertex, final MapEmitter<NullObject, Object> emitter) {
+ VertexTraversalSideEffects.of(vertex).<Collection<?>>orElse(this.sideEffectKey, Collections.emptyList()).forEach(emitter::emit);
+ }
+
+ @Override
+ public Collection generateFinalResult(final Iterator<KeyValue<NullObject, Object>> keyValues) {
+ final Collection collection = this.collectionSupplier.get();
+ keyValues.forEachRemaining(pair -> collection.add(pair.getValue()));
+ return collection;
+ }
+
+ @Override
+ public String getMemoryKey() {
+ return this.sideEffectKey;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4c97e964/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SubgraphStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SubgraphStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SubgraphStep.java
new file mode 100644
index 0000000..0657585
--- /dev/null
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/sideEffect/SubgraphStep.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect;
+
+import org.apache.tinkerpop.gremlin.process.traversal.T;
+import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
+import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
+import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectCapable;
+import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
+import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.structure.Direction;
+import org.apache.tinkerpop.gremlin.structure.Edge;
+import org.apache.tinkerpop.gremlin.structure.Graph;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import org.apache.tinkerpop.gremlin.structure.VertexProperty;
+import org.apache.tinkerpop.gremlin.structure.util.GraphFactory;
+
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A side-effect step that produces an edge induced subgraph.
+ *
+ * @author Stephen Mallette (http://stephen.genoprime.com)
+ * @author Marko A. Rodriguez (http://markorodriguez.com)
+ */
+public final class SubgraphStep extends SideEffectStep<Edge> implements SideEffectCapable {
+
+ private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
+ TraverserRequirement.OBJECT,
+ TraverserRequirement.SIDE_EFFECTS
+ );
+
+ private Graph subgraph;
+ private String sideEffectKey;
+
+ private static final Map<String, Object> DEFAULT_CONFIGURATION = new HashMap<String, Object>() {{
+ put(Graph.GRAPH, "org.apache.tinkerpop.gremlin.tinkergraph.structure.TinkerGraph"); // hard coded because TinkerGraph is not part of gremlin-core
+ }};
+
+ // TODO: add support for side-effecting out an edge list.
+
+ public SubgraphStep(final Traversal.Admin traversal, final String sideEffectKey) {
+ super(traversal);
+ this.sideEffectKey = sideEffectKey;
+ this.getTraversal().asAdmin().getSideEffects().registerSupplierIfAbsent(this.sideEffectKey, () -> GraphFactory.open(DEFAULT_CONFIGURATION));
+ }
+
+ @Override
+ protected void sideEffect(final Traverser.Admin<Edge> traverser) {
+ if (null == this.subgraph) {
+ this.subgraph = traverser.sideEffects(this.sideEffectKey);
+ if (!this.subgraph.features().vertex().supportsUserSuppliedIds() || !this.subgraph.features().edge().supportsUserSuppliedIds())
+ throw new IllegalArgumentException("The provided subgraph must support user supplied ids for vertices and edges: " + this.subgraph);
+ }
+ SubgraphStep.addEdgeToSubgraph(this.subgraph, traverser.get());
+ }
+
+ @Override
+ public String getSideEffectKey() {
+ return this.sideEffectKey;
+ }
+
+ @Override
+ public String toString() {
+ return TraversalHelper.makeStepString(this, this.sideEffectKey);
+ }
+
+ @Override
+ public Set<TraverserRequirement> getRequirements() {
+ return REQUIREMENTS;
+ }
+
+ @Override
+ public SubgraphStep clone() {
+ final SubgraphStep clone = (SubgraphStep) super.clone();
+ this.subgraph = null;
+ return clone;
+ }
+
+ ///
+
+ private static Vertex getOrCreate(final Graph subgraph, final Vertex vertex) {
+ final Iterator<Vertex> vertexIterator = subgraph.vertices(vertex.id());
+ if (vertexIterator.hasNext()) return vertexIterator.next();
+ final Vertex subgraphVertex = subgraph.addVertex(T.id, vertex.id(), T.label, vertex.label());
+ vertex.properties().forEachRemaining(vertexProperty -> {
+ final VertexProperty<?> subgraphVertexProperty = subgraphVertex.property(vertexProperty.key(), vertexProperty.value(), T.id, vertexProperty.id()); // TODO: demand vertex property id?
+ vertexProperty.properties().forEachRemaining(property -> subgraphVertexProperty.<Object>property(property.key(), property.value()));
+ });
+ return subgraphVertex;
+ }
+
+ private static void addEdgeToSubgraph(final Graph subgraph, final Edge edge) {
+ final Iterator<Edge> edgeIterator = subgraph.edges(edge.id());
+ if (edgeIterator.hasNext()) return;
+ final Iterator<Vertex> vertexIterator = edge.vertices(Direction.BOTH);
+ final Vertex subGraphOutVertex = getOrCreate(subgraph, vertexIterator.next());
+ final Vertex subGraphInVertex = getOrCreate(subgraph, vertexIterator.next());
+ final Edge subGraphEdge = subGraphOutVertex.addEdge(edge.label(), subGraphInVertex, T.id, edge.id());
+ edge.properties().forEachRemaining(property -> subGraphEdge.<Object>property(property.key(), property.value()));
+ }
+}