You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/02/20 17:09:20 UTC
incubator-tinkerpop git commit: all the mapreduce/ MapRedcuers are
now inner static classes of the respective step. More work on null traversals
throughout the sideEffects.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 9d5f3e98e -> ac12f999b
all the mapreduce/ MapRedcuers are now inner static classes of the respective step. More work on null traversals throughout the sideEffects.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/ac12f999
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/ac12f999
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/ac12f999
Branch: refs/heads/master
Commit: ac12f999b58cfb8664f67545955aa4a15daff170
Parents: 9d5f3e9
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Feb 20 09:09:07 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Feb 20 09:09:16 2015 -0700
----------------------------------------------------------------------
.../graph/traversal/step/map/GroupStep.java | 14 ++--
.../step/sideEffect/AggregateStep.java | 77 +++++++++++++++--
.../sideEffect/GroupCountSideEffectStep.java | 14 ++--
.../step/sideEffect/GroupSideEffectStep.java | 55 ++++++------
.../traversal/step/sideEffect/ProfileStep.java | 56 ++++++++++++-
.../traversal/step/sideEffect/StoreStep.java | 74 ++++++++++++++--
.../traversal/step/sideEffect/TreeStep.java | 59 ++++++++++++-
.../mapreduce/AggregateMapReduce.java | 88 --------------------
.../sideEffect/mapreduce/ProfileMapReduce.java | 66 ---------------
.../sideEffect/mapreduce/StoreMapReduce.java | 88 --------------------
.../sideEffect/mapreduce/TreeMapReduce.java | 80 ------------------
11 files changed, 286 insertions(+), 385 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac12f999/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/GroupStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/GroupStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/GroupStep.java
index 7c795f1..ae75c70 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/GroupStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/map/GroupStep.java
@@ -146,19 +146,15 @@ public final class GroupStep<S, K, V, R> extends ReducingBarrierStep<S, Map<K, R
@Override
public Map<K, Collection<V>> apply(final Map<K, Collection<V>> mutatingSeed, final Traverser.Admin<S> traverser) {
- this.doGroup(traverser, mutatingSeed, GroupStep.this.keyTraversal, GroupStep.this.valueTraversal);
- return mutatingSeed;
- }
-
- private void doGroup(final Traverser.Admin<S> traverser, final Map<K, Collection<V>> groupMap, final Traversal.Admin<S, K> keyTraversal, final Traversal.Admin<S, V> valueTraversal) {
- final K key = TraversalUtil.applyNullable(traverser, keyTraversal);
- final V value = TraversalUtil.applyNullable(traverser, valueTraversal);
- Collection<V> values = groupMap.get(key);
+ final K key = TraversalUtil.applyNullable(traverser, GroupStep.this.keyTraversal);
+ final V value = TraversalUtil.applyNullable(traverser, GroupStep.this.valueTraversal);
+ Collection<V> values = mutatingSeed.get(key);
if (null == values) {
values = new BulkSet<>();
- groupMap.put(key, values);
+ mutatingSeed.put(key, values);
}
TraversalHelper.addToCollectionUnrollIterator(values, value, traverser.bulk());
+ return mutatingSeed;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac12f999/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AggregateStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AggregateStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AggregateStep.java
index 5d7bc80..2d5d5e8 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AggregateStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/AggregateStep.java
@@ -18,32 +18,39 @@
*/
package org.apache.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.process.Traversal;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
import org.apache.tinkerpop.gremlin.process.graph.traversal.step.SideEffectCapable;
-import org.apache.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.mapreduce.AggregateMapReduce;
import org.apache.tinkerpop.gremlin.process.graph.traversal.step.util.CollectingBarrierStep;
-import org.apache.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.step.Reversible;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
import org.apache.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.util.BulkSet;
import org.apache.tinkerpop.gremlin.process.util.TraverserSet;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.function.BulkSetSupplier;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.function.Supplier;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class AggregateStep<S> extends CollectingBarrierStep<S> implements SideEffectCapable, Reversible, TraversalParent, MapReducer<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
- private Traversal.Admin<S, Object> aggregateTraversal = new IdentityTraversal<>();
+ private Traversal.Admin<S, Object> aggregateTraversal = null;
private String sideEffectKey;
public AggregateStep(final Traversal.Admin traversal, final String sideEffectKey) {
@@ -68,13 +75,13 @@ public final class AggregateStep<S> extends CollectingBarrierStep<S> implements
}
@Override
- public void addLocalChild(final Traversal.Admin<?, ?> traversal) {
- this.aggregateTraversal = this.integrateChild(traversal);
+ public void addLocalChild(final Traversal.Admin<?, ?> aggregateTraversal) {
+ this.aggregateTraversal = this.integrateChild(aggregateTraversal);
}
@Override
public List<Traversal.Admin<S, Object>> getLocalChildren() {
- return Collections.singletonList(this.aggregateTraversal);
+ return null == this.aggregateTraversal ? Collections.emptyList() : Collections.singletonList(this.aggregateTraversal);
}
@Override
@@ -82,7 +89,7 @@ public final class AggregateStep<S> extends CollectingBarrierStep<S> implements
traverserSet.forEach(traverser ->
TraversalHelper.addToCollection(
traverser.getSideEffects().get(this.sideEffectKey),
- TraversalUtil.apply(traverser, this.aggregateTraversal),
+ TraversalUtil.applyNullable(traverser, this.aggregateTraversal),
traverser.bulk()));
}
@@ -94,7 +101,61 @@ public final class AggregateStep<S> extends CollectingBarrierStep<S> implements
@Override
public AggregateStep<S> clone() throws CloneNotSupportedException {
final AggregateStep<S> clone = (AggregateStep<S>) super.clone();
- clone.aggregateTraversal = this.integrateChild(this.aggregateTraversal.clone());
+ if (null != this.aggregateTraversal)
+ clone.aggregateTraversal = clone.integrateChild(this.aggregateTraversal.clone());
return clone;
}
+
+ ////////
+
+ public static final class AggregateMapReduce extends StaticMapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
+
+ public static final String AGGREGATE_STEP_SIDE_EFFECT_KEY = "gremlin.aggregateStep.sideEffectKey";
+
+ private String sideEffectKey;
+ private Supplier<Collection> collectionSupplier;
+
+ private AggregateMapReduce() {
+
+ }
+
+ public AggregateMapReduce(final AggregateStep step) {
+ this.sideEffectKey = step.getSideEffectKey();
+ this.collectionSupplier = step.getTraversal().asAdmin().getSideEffects().<Collection>getRegisteredSupplier(this.sideEffectKey).orElse(BulkSet::new);
+ }
+
+ @Override
+ public void storeState(final Configuration configuration) {
+ super.storeState(configuration);
+ configuration.setProperty(AGGREGATE_STEP_SIDE_EFFECT_KEY, this.sideEffectKey);
+ }
+
+ @Override
+ public void loadState(final Configuration configuration) {
+ this.sideEffectKey = configuration.getString(AGGREGATE_STEP_SIDE_EFFECT_KEY);
+ this.collectionSupplier = TraversalVertexProgram.getTraversalSupplier(configuration).get().getSideEffects().<Collection>getRegisteredSupplier(this.sideEffectKey).orElse(BulkSet::new);
+ }
+
+ @Override
+ public boolean doStage(final Stage stage) {
+ return stage.equals(Stage.MAP);
+ }
+
+ @Override
+ public void map(final Vertex vertex, final MapEmitter<NullObject, Object> emitter) {
+ VertexTraversalSideEffects.of(vertex).<Collection<?>>orElse(this.sideEffectKey, Collections.emptyList()).forEach(emitter::emit);
+ }
+
+ @Override
+ public Collection generateFinalResult(final Iterator<KeyValue<NullObject, Object>> keyValues) {
+ final Collection collection = this.collectionSupplier.get();
+ keyValues.forEachRemaining(keyValue -> collection.add(keyValue.getValue()));
+ return collection;
+ }
+
+ @Override
+ public String getMemoryKey() {
+ return this.sideEffectKey;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac12f999/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupCountSideEffectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupCountSideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupCountSideEffectStep.java
index 42144d9..0669b8c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupCountSideEffectStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupCountSideEffectStep.java
@@ -27,7 +27,6 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexPr
import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
import org.apache.tinkerpop.gremlin.process.graph.traversal.step.SideEffectCapable;
-import org.apache.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.step.Reversible;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
@@ -51,7 +50,7 @@ import java.util.function.Supplier;
*/
public final class GroupCountSideEffectStep<S, E> extends SideEffectStep<S> implements SideEffectCapable, Reversible, TraversalParent, MapReducer<E, Long, E, Long, Map<E, Long>> {
- private Traversal.Admin<S, E> groupTraversal = new IdentityTraversal<>();
+ private Traversal.Admin<S, E> groupTraversal = null;
private String sideEffectKey;
public GroupCountSideEffectStep(final Traversal.Admin traversal, final String sideEffectKey) {
@@ -63,7 +62,7 @@ public final class GroupCountSideEffectStep<S, E> extends SideEffectStep<S> impl
@Override
protected void sideEffect(final Traverser.Admin<S> traverser) {
final Map<Object, Long> groupCountMap = traverser.sideEffects(this.sideEffectKey);
- MapHelper.incr(groupCountMap, TraversalUtil.apply(traverser.asAdmin(), this.groupTraversal), traverser.bulk());
+ MapHelper.incr(groupCountMap, TraversalUtil.applyNullable(traverser.asAdmin(), this.groupTraversal), traverser.bulk());
}
@Override
@@ -82,13 +81,13 @@ public final class GroupCountSideEffectStep<S, E> extends SideEffectStep<S> impl
}
@Override
- public void addLocalChild(final Traversal.Admin<?, ?> traversal) {
- this.groupTraversal = this.integrateChild(traversal);
+ public void addLocalChild(final Traversal.Admin<?, ?> groupTraversal) {
+ this.groupTraversal = this.integrateChild(groupTraversal);
}
@Override
public List<Traversal.Admin<S, E>> getLocalChildren() {
- return Collections.singletonList(this.groupTraversal);
+ return null == this.groupTraversal ? Collections.emptyList() : Collections.singletonList(this.groupTraversal);
}
@Override
@@ -99,7 +98,8 @@ public final class GroupCountSideEffectStep<S, E> extends SideEffectStep<S> impl
@Override
public GroupCountSideEffectStep<S, E> clone() throws CloneNotSupportedException {
final GroupCountSideEffectStep<S, E> clone = (GroupCountSideEffectStep<S, E>) super.clone();
- clone.groupTraversal = this.integrateChild(this.groupTraversal.clone());
+ if (null != this.groupTraversal)
+ clone.groupTraversal = clone.integrateChild(this.groupTraversal.clone());
return clone;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac12f999/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupSideEffectStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupSideEffectStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupSideEffectStep.java
index 93d9ed3..44b19aa 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupSideEffectStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/GroupSideEffectStep.java
@@ -28,7 +28,6 @@ import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexPr
import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
import org.apache.tinkerpop.gremlin.process.graph.traversal.step.SideEffectCapable;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalMatrix;
-import org.apache.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.step.EngineDependent;
import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.step.Reversible;
@@ -41,7 +40,7 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
-import java.util.Arrays;
+import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -57,8 +56,8 @@ import java.util.function.Supplier;
public final class GroupSideEffectStep<S, K, V, R> extends SideEffectStep<S> implements SideEffectCapable, TraversalParent, Reversible, EngineDependent, MapReducer<K, Collection<V>, K, R, Map<K, R>> {
private char state = 'k';
- private Traversal.Admin<S, K> keyTraversal = new IdentityTraversal<>();
- private Traversal.Admin<S, V> valueTraversal = new IdentityTraversal<>();
+ private Traversal.Admin<S, K> keyTraversal = null;
+ private Traversal.Admin<S, V> valueTraversal = null;
private Traversal.Admin<Collection<V>, R> reduceTraversal = null;
private String sideEffectKey;
private boolean onGraphComputer = false;
@@ -72,12 +71,20 @@ public final class GroupSideEffectStep<S, K, V, R> extends SideEffectStep<S> imp
@Override
protected void sideEffect(final Traverser.Admin<S> traverser) {
- final Map<K, Collection<V>> groupByMap = null == this.tempGroupByMap ? traverser.sideEffects(this.sideEffectKey) : this.tempGroupByMap; // for nested traversals and not !starts.hasNext()
- doGroup(traverser.asAdmin(), groupByMap, this.keyTraversal, this.valueTraversal);
+ final Map<K, Collection<V>> groupMap = null == this.tempGroupByMap ? traverser.sideEffects(this.sideEffectKey) : this.tempGroupByMap; // for nested traversals and not !starts.hasNext()
+ final K key = TraversalUtil.applyNullable(traverser, keyTraversal);
+ final V value = TraversalUtil.applyNullable(traverser, valueTraversal);
+ Collection<V> values = groupMap.get(key);
+ if (null == values) {
+ values = new BulkSet<>();
+ groupMap.put(key, values);
+ }
+ TraversalHelper.addToCollectionUnrollIterator(values, value, traverser.bulk());
+ //////// reducer for OLTP
if (!this.onGraphComputer && null != this.reduceTraversal && !this.starts.hasNext()) {
- this.tempGroupByMap = groupByMap;
+ this.tempGroupByMap = groupMap;
final Map<K, R> reduceMap = new HashMap<>();
- doReduce(groupByMap, reduceMap, this.reduceTraversal);
+ groupMap.forEach((k, vv) -> reduceMap.put(k, TraversalUtil.applyNullable(vv, this.reduceTraversal)));
traverser.sideEffects(this.sideEffectKey, reduceMap);
}
}
@@ -87,21 +94,6 @@ public final class GroupSideEffectStep<S, K, V, R> extends SideEffectStep<S> imp
return this.sideEffectKey;
}
- private static <S, K, V> void doGroup(final Traverser.Admin<S> traverser, final Map<K, Collection<V>> groupMap, final Traversal.Admin<S, K> keyTraversal, final Traversal.Admin<S, V> valueTraversal) {
- final K key = TraversalUtil.apply(traverser, keyTraversal);
- final V value = TraversalUtil.apply(traverser, valueTraversal);
- Collection<V> values = groupMap.get(key);
- if (null == values) {
- values = new BulkSet<>();
- groupMap.put(key, values);
- }
- TraversalHelper.addToCollectionUnrollIterator(values, value, traverser.bulk());
- }
-
- private static <K, V, R> void doReduce(final Map<K, Collection<V>> groupMap, final Map<K, R> reduceMap, final Traversal.Admin<Collection<V>, R> reduceTraversal) {
- groupMap.forEach((k, vv) -> reduceMap.put(k, TraversalUtil.apply(vv, reduceTraversal)));
- }
-
@Override
public void onEngine(final TraversalEngine traversalEngine) {
this.onGraphComputer = traversalEngine.isComputer();
@@ -119,7 +111,14 @@ public final class GroupSideEffectStep<S, K, V, R> extends SideEffectStep<S> imp
@Override
public <A, B> List<Traversal.Admin<A, B>> getLocalChildren() {
- return null == this.reduceTraversal ? (List) Arrays.asList(this.keyTraversal, this.valueTraversal) : (List) Arrays.asList(this.keyTraversal, this.valueTraversal, this.reduceTraversal);
+ final List<Traversal.Admin<A, B>> children = new ArrayList<>(3);
+ if (null != this.keyTraversal)
+ children.add((Traversal.Admin) this.keyTraversal);
+ if (null != this.valueTraversal)
+ children.add((Traversal.Admin) this.valueTraversal);
+ if (null != this.reduceTraversal)
+ children.add((Traversal.Admin) this.reduceTraversal);
+ return children;
}
public Traversal.Admin<Collection<V>, R> getReduceTraversal() {
@@ -150,8 +149,10 @@ public final class GroupSideEffectStep<S, K, V, R> extends SideEffectStep<S> imp
@Override
public GroupSideEffectStep<S, K, V, R> clone() throws CloneNotSupportedException {
final GroupSideEffectStep<S, K, V, R> clone = (GroupSideEffectStep<S, K, V, R>) super.clone();
- clone.keyTraversal = clone.integrateChild(this.keyTraversal.clone());
- clone.valueTraversal = clone.integrateChild(this.valueTraversal.clone());
+ if (null != this.keyTraversal)
+ clone.keyTraversal = clone.integrateChild(this.keyTraversal.clone());
+ if (null != this.valueTraversal)
+ clone.valueTraversal = clone.integrateChild(this.valueTraversal.clone());
if (null != this.reduceTraversal)
clone.reduceTraversal = clone.integrateChild(this.reduceTraversal.clone());
return clone;
@@ -213,7 +214,7 @@ public final class GroupSideEffectStep<S, K, V, R> extends SideEffectStep<S> imp
public void reduce(final K key, final Iterator<Collection<V>> values, final ReduceEmitter<K, R> emitter) {
final Set<V> set = new BulkSet<>();
values.forEachRemaining(set::addAll);
- emitter.emit(key, (null == this.reduceTraversal) ? (R) set : TraversalUtil.apply(set, this.reduceTraversal));
+ emitter.emit(key, TraversalUtil.applyNullable(set, this.reduceTraversal));
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac12f999/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileStep.java
index bf930af..830a969 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/ProfileStep.java
@@ -20,16 +20,19 @@ package org.apache.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
import org.apache.tinkerpop.gremlin.process.Step;
import org.apache.tinkerpop.gremlin.process.Traversal;
-import org.apache.tinkerpop.gremlin.process.TraversalEngine;
import org.apache.tinkerpop.gremlin.process.Traverser;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
+import org.apache.tinkerpop.gremlin.process.traversal.step.AbstractStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.step.Reversible;
-import org.apache.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.mapreduce.ProfileMapReduce;
-import org.apache.tinkerpop.gremlin.process.traversal.step.AbstractStep;
import org.apache.tinkerpop.gremlin.process.util.metric.StandardTraversalMetrics;
import org.apache.tinkerpop.gremlin.process.util.metric.TraversalMetrics;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
+import java.util.Iterator;
import java.util.NoSuchElementException;
/**
@@ -51,7 +54,7 @@ public final class ProfileStep<S> extends AbstractStep<S, S> implements Reversib
@Override
public MapReduce<MapReduce.NullObject, StandardTraversalMetrics, MapReduce.NullObject, StandardTraversalMetrics, StandardTraversalMetrics> getMapReduce() {
- return new ProfileMapReduce();
+ return ProfileMapReduce.instance();
}
@Override
@@ -93,4 +96,49 @@ public final class ProfileStep<S> extends AbstractStep<S, S> implements Reversib
traversalMetrics.initializeIfNecessary(this.getId(), this.getTraversal().getSteps().indexOf(this), name, isComputer);
return traversalMetrics;
}
+
+ //////////////////
+
+ public static final class ProfileMapReduce extends StaticMapReduce<MapReduce.NullObject, StandardTraversalMetrics, MapReduce.NullObject, StandardTraversalMetrics, StandardTraversalMetrics> {
+
+ private static ProfileMapReduce INSTANCE = new ProfileMapReduce();
+
+ private ProfileMapReduce() {
+
+ }
+
+ @Override
+ public boolean doStage(final Stage stage) {
+ return true;
+ }
+
+ @Override
+ public String getMemoryKey() {
+ return TraversalMetrics.METRICS_KEY;
+ }
+
+ @Override
+ public void map(final Vertex vertex, final MapEmitter<NullObject, StandardTraversalMetrics> emitter) {
+ VertexTraversalSideEffects.of(vertex).<StandardTraversalMetrics>ifPresent(TraversalMetrics.METRICS_KEY, emitter::emit);
+ }
+
+ @Override
+ public void combine(final NullObject key, final Iterator<StandardTraversalMetrics> values, final ReduceEmitter<NullObject, StandardTraversalMetrics> emitter) {
+ reduce(key, values, emitter);
+ }
+
+ @Override
+ public void reduce(final NullObject key, final Iterator<StandardTraversalMetrics> values, final ReduceEmitter<NullObject, StandardTraversalMetrics> emitter) {
+ emitter.emit(StandardTraversalMetrics.merge(values));
+ }
+
+ @Override
+ public StandardTraversalMetrics generateFinalResult(final Iterator<KeyValue<NullObject, StandardTraversalMetrics>> keyValues) {
+ return keyValues.next().getValue();
+ }
+
+ public static ProfileMapReduce instance() {
+ return INSTANCE;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac12f999/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StoreStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StoreStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StoreStep.java
index c89df07..8ff043c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StoreStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/StoreStep.java
@@ -18,31 +18,38 @@
*/
package org.apache.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.process.Traversal;
import org.apache.tinkerpop.gremlin.process.Traverser;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
import org.apache.tinkerpop.gremlin.process.graph.traversal.step.SideEffectCapable;
-import org.apache.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.mapreduce.StoreMapReduce;
-import org.apache.tinkerpop.gremlin.process.traversal.lambda.IdentityTraversal;
import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.step.Reversible;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
import org.apache.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.process.util.BulkSet;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.function.BulkSetSupplier;
import java.util.Collection;
import java.util.Collections;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.function.Supplier;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
public final class StoreStep<S> extends SideEffectStep<S> implements SideEffectCapable, Reversible, TraversalParent, MapReducer<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
- private Traversal.Admin<S, Object> storeTraversal = new IdentityTraversal<>();
+ private Traversal.Admin<S, Object> storeTraversal = null;
private String sideEffectKey;
public StoreStep(final Traversal.Admin traversal, final String sideEffectKey) {
@@ -55,7 +62,7 @@ public final class StoreStep<S> extends SideEffectStep<S> implements SideEffectC
protected void sideEffect(final Traverser.Admin<S> traverser) {
TraversalHelper.addToCollection(
traverser.sideEffects(this.sideEffectKey),
- TraversalUtil.apply(traverser.asAdmin(), this.storeTraversal),
+ TraversalUtil.applyNullable(traverser.asAdmin(), this.storeTraversal),
traverser.bulk());
}
@@ -76,7 +83,7 @@ public final class StoreStep<S> extends SideEffectStep<S> implements SideEffectC
@Override
public List<Traversal.Admin<S, Object>> getLocalChildren() {
- return Collections.singletonList(this.storeTraversal);
+ return null == this.storeTraversal ? Collections.emptyList() : Collections.singletonList(this.storeTraversal);
}
@Override
@@ -92,7 +99,62 @@ public final class StoreStep<S> extends SideEffectStep<S> implements SideEffectC
@Override
public StoreStep<S> clone() throws CloneNotSupportedException {
final StoreStep<S> clone = (StoreStep<S>) super.clone();
- clone.storeTraversal = clone.integrateChild(this.storeTraversal.clone());
+ if (null != this.storeTraversal)
+ clone.storeTraversal = clone.integrateChild(this.storeTraversal.clone());
return clone;
}
+
+ ///////////////
+
+ public static final class StoreMapReduce extends StaticMapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
+
+ public static final String STORE_STEP_SIDE_EFFECT_KEY = "gremlin.storeStep.sideEffectKey";
+
+ private String sideEffectKey;
+ private Supplier<Collection> collectionSupplier;
+
+ private StoreMapReduce() {
+
+ }
+
+ public StoreMapReduce(final StoreStep step) {
+ this.sideEffectKey = step.getSideEffectKey();
+ this.collectionSupplier = step.getTraversal().asAdmin().getSideEffects().<Collection>getRegisteredSupplier(this.sideEffectKey).orElse(BulkSet::new);
+ }
+
+ @Override
+ public void storeState(final Configuration configuration) {
+ super.storeState(configuration);
+ configuration.setProperty(STORE_STEP_SIDE_EFFECT_KEY, this.sideEffectKey);
+ }
+
+ @Override
+ public void loadState(final Configuration configuration) {
+ this.sideEffectKey = configuration.getString(STORE_STEP_SIDE_EFFECT_KEY);
+ this.collectionSupplier = TraversalVertexProgram.getTraversalSupplier(configuration).get().getSideEffects().<Collection>getRegisteredSupplier(this.sideEffectKey).orElse(BulkSet::new);
+ }
+
+ @Override
+ public boolean doStage(final Stage stage) {
+ return stage.equals(Stage.MAP);
+ }
+
+ @Override
+ public void map(final Vertex vertex, final MapEmitter<NullObject, Object> emitter) {
+ VertexTraversalSideEffects.of(vertex).<Collection<?>>orElse(this.sideEffectKey, Collections.emptyList()).forEach(emitter::emit);
+ }
+
+ @Override
+ public Collection generateFinalResult(final Iterator<KeyValue<NullObject, Object>> keyValues) {
+ final Collection collection = this.collectionSupplier.get();
+ keyValues.forEachRemaining(pair -> collection.add(pair.getValue()));
+ return collection;
+ }
+
+ @Override
+ public String getMemoryKey() {
+ return this.sideEffectKey;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac12f999/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/TreeStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/TreeStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/TreeStep.java
index e9ad40f..82d3427 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/TreeStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/TreeStep.java
@@ -18,23 +18,27 @@
*/
package org.apache.tinkerpop.gremlin.process.graph.traversal.step.sideEffect;
+import org.apache.commons.configuration.Configuration;
import org.apache.tinkerpop.gremlin.process.Path;
import org.apache.tinkerpop.gremlin.process.Traversal;
import org.apache.tinkerpop.gremlin.process.Traverser;
+import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
+import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
import org.apache.tinkerpop.gremlin.process.graph.traversal.step.SideEffectCapable;
-import org.apache.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.mapreduce.TreeMapReduce;
import org.apache.tinkerpop.gremlin.process.graph.util.Tree;
import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.step.Reversible;
-import org.apache.tinkerpop.gremlin.process.traversal.step.SideEffectRegistrar;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalRing;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
import org.apache.tinkerpop.gremlin.process.traverser.TraverserRequirement;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.function.TreeSupplier;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
@@ -111,4 +115,55 @@ public final class TreeStep<S> extends SideEffectStep<S> implements Reversible,
public Set<TraverserRequirement> getRequirements() {
return this.getSelfAndChildRequirements(TraverserRequirement.PATH, TraverserRequirement.SIDE_EFFECTS);
}
+
+ ////////////////
+
+ public static final class TreeMapReduce extends StaticMapReduce<Object, Tree, Object, Tree, Tree> {
+
+ public static final String TREE_STEP_SIDE_EFFECT_KEY = "gremlin.treeStep.sideEffectKey";
+
+ private String sideEffectKey;
+
+ private TreeMapReduce() {
+
+ }
+
+ public TreeMapReduce(final TreeStep step) {
+ this.sideEffectKey = step.getSideEffectKey();
+ }
+
+ @Override
+ public void storeState(final Configuration configuration) {
+ super.storeState(configuration);
+ configuration.setProperty(TREE_STEP_SIDE_EFFECT_KEY, this.sideEffectKey);
+ }
+
+ @Override
+ public void loadState(final Configuration configuration) {
+ this.sideEffectKey = configuration.getString(TREE_STEP_SIDE_EFFECT_KEY);
+ }
+
+ @Override
+ public boolean doStage(final Stage stage) {
+ return stage.equals(Stage.MAP);
+ }
+
+ @Override
+ public void map(final Vertex vertex, final MapEmitter<Object, Tree> emitter) {
+ VertexTraversalSideEffects.of(vertex).<Tree<?>>ifPresent(this.sideEffectKey, tree -> tree.splitParents().forEach(branches -> emitter.emit(branches.keySet().iterator().next(), branches)));
+ }
+
+ @Override
+ public Tree generateFinalResult(final Iterator<KeyValue<Object, Tree>> keyValues) {
+ final Tree result = new Tree();
+ keyValues.forEachRemaining(keyValue -> result.addTree(keyValue.getValue()));
+ return result;
+ }
+
+ @Override
+ public String getMemoryKey() {
+ return this.sideEffectKey;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac12f999/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/mapreduce/AggregateMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/mapreduce/AggregateMapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/mapreduce/AggregateMapReduce.java
deleted file mode 100644
index fe0655a..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/mapreduce/AggregateMapReduce.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.mapreduce;
-
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
-import org.apache.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.AggregateStep;
-import org.apache.tinkerpop.gremlin.process.util.BulkSet;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.commons.configuration.Configuration;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.function.Supplier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class AggregateMapReduce extends StaticMapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
-
- public static final String AGGREGATE_STEP_SIDE_EFFECT_KEY = "gremlin.aggregateStep.sideEffectKey";
-
- private String sideEffectKey;
- private Supplier<Collection> collectionSupplier;
-
- private AggregateMapReduce() {
-
- }
-
- public AggregateMapReduce(final AggregateStep step) {
- this.sideEffectKey = step.getSideEffectKey();
- this.collectionSupplier = step.getTraversal().asAdmin().getSideEffects().<Collection>getRegisteredSupplier(this.sideEffectKey).orElse(BulkSet::new);
- }
-
- @Override
- public void storeState(final Configuration configuration) {
- super.storeState(configuration);
- configuration.setProperty(AGGREGATE_STEP_SIDE_EFFECT_KEY, this.sideEffectKey);
- }
-
- @Override
- public void loadState(final Configuration configuration) {
- this.sideEffectKey = configuration.getString(AGGREGATE_STEP_SIDE_EFFECT_KEY);
- this.collectionSupplier = TraversalVertexProgram.getTraversalSupplier(configuration).get().getSideEffects().<Collection>getRegisteredSupplier(this.sideEffectKey).orElse(BulkSet::new);
- }
-
- @Override
- public boolean doStage(final Stage stage) {
- return stage.equals(Stage.MAP);
- }
-
- @Override
- public void map(final Vertex vertex, final MapEmitter<NullObject, Object> emitter) {
- VertexTraversalSideEffects.of(vertex).<Collection<?>>orElse(this.sideEffectKey, Collections.emptyList()).forEach(emitter::emit);
- }
-
- @Override
- public Collection generateFinalResult(final Iterator<KeyValue<NullObject, Object>> keyValues) {
- final Collection collection = this.collectionSupplier.get();
- keyValues.forEachRemaining(keyValue -> collection.add(keyValue.getValue()));
- return collection;
- }
-
- @Override
- public String getMemoryKey() {
- return this.sideEffectKey;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac12f999/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/mapreduce/ProfileMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/mapreduce/ProfileMapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/mapreduce/ProfileMapReduce.java
deleted file mode 100644
index 3399b9a..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/mapreduce/ProfileMapReduce.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.mapreduce;
-
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
-import org.apache.tinkerpop.gremlin.process.util.metric.StandardTraversalMetrics;
-import org.apache.tinkerpop.gremlin.process.util.metric.TraversalMetrics;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-
-import java.util.Iterator;
-
-public final class ProfileMapReduce extends StaticMapReduce<MapReduce.NullObject, StandardTraversalMetrics, MapReduce.NullObject, StandardTraversalMetrics, StandardTraversalMetrics> {
-
- public ProfileMapReduce() {
-
- }
-
- @Override
- public boolean doStage(final Stage stage) {
- return true;
- }
-
- @Override
- public String getMemoryKey() {
- return TraversalMetrics.METRICS_KEY;
- }
-
- @Override
- public void map(final Vertex vertex, final MapEmitter<NullObject, StandardTraversalMetrics> emitter) {
- VertexTraversalSideEffects.of(vertex).<StandardTraversalMetrics>ifPresent(TraversalMetrics.METRICS_KEY, emitter::emit);
- }
-
- @Override
- public void combine(final NullObject key, final Iterator<StandardTraversalMetrics> values, final ReduceEmitter<NullObject, StandardTraversalMetrics> emitter) {
- reduce(key, values, emitter);
- }
-
- @Override
- public void reduce(final NullObject key, final Iterator<StandardTraversalMetrics> values, final ReduceEmitter<NullObject, StandardTraversalMetrics> emitter) {
- emitter.emit(StandardTraversalMetrics.merge(values));
- }
-
- @Override
- public StandardTraversalMetrics generateFinalResult(final Iterator<KeyValue<NullObject, StandardTraversalMetrics>> keyValues) {
- return keyValues.next().getValue();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac12f999/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/mapreduce/StoreMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/mapreduce/StoreMapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/mapreduce/StoreMapReduce.java
deleted file mode 100644
index 8080b71..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/mapreduce/StoreMapReduce.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.mapreduce;
-
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
-import org.apache.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.StoreStep;
-import org.apache.tinkerpop.gremlin.process.util.BulkSet;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.commons.configuration.Configuration;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.function.Supplier;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class StoreMapReduce extends StaticMapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Collection> {
-
- public static final String STORE_STEP_SIDE_EFFECT_KEY = "gremlin.storeStep.sideEffectKey";
-
- private String sideEffectKey;
- private Supplier<Collection> collectionSupplier;
-
- private StoreMapReduce() {
-
- }
-
- public StoreMapReduce(final StoreStep step) {
- this.sideEffectKey = step.getSideEffectKey();
- this.collectionSupplier = step.getTraversal().asAdmin().getSideEffects().<Collection>getRegisteredSupplier(this.sideEffectKey).orElse(BulkSet::new);
- }
-
- @Override
- public void storeState(final Configuration configuration) {
- super.storeState(configuration);
- configuration.setProperty(STORE_STEP_SIDE_EFFECT_KEY, this.sideEffectKey);
- }
-
- @Override
- public void loadState(final Configuration configuration) {
- this.sideEffectKey = configuration.getString(STORE_STEP_SIDE_EFFECT_KEY);
- this.collectionSupplier = TraversalVertexProgram.getTraversalSupplier(configuration).get().getSideEffects().<Collection>getRegisteredSupplier(this.sideEffectKey).orElse(BulkSet::new);
- }
-
- @Override
- public boolean doStage(final Stage stage) {
- return stage.equals(Stage.MAP);
- }
-
- @Override
- public void map(final Vertex vertex, final MapEmitter<NullObject, Object> emitter) {
- VertexTraversalSideEffects.of(vertex).<Collection<?>>orElse(this.sideEffectKey, Collections.emptyList()).forEach(emitter::emit);
- }
-
- @Override
- public Collection generateFinalResult(final Iterator<KeyValue<NullObject, Object>> keyValues) {
- final Collection collection = this.collectionSupplier.get();
- keyValues.forEachRemaining(pair -> collection.add(pair.getValue()));
- return collection;
- }
-
- @Override
- public String getMemoryKey() {
- return this.sideEffectKey;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ac12f999/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/mapreduce/TreeMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/mapreduce/TreeMapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/mapreduce/TreeMapReduce.java
deleted file mode 100644
index bdf3477..0000000
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/graph/traversal/step/sideEffect/mapreduce/TreeMapReduce.java
+++ /dev/null
@@ -1,80 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.mapreduce;
-
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.VertexTraversalSideEffects;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
-import org.apache.tinkerpop.gremlin.process.graph.traversal.step.sideEffect.TreeStep;
-import org.apache.tinkerpop.gremlin.process.graph.util.Tree;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
-import org.apache.commons.configuration.Configuration;
-
-import java.util.Iterator;
-
-/**
- * @author Marko A. Rodriguez (http://markorodriguez.com)
- */
-public final class TreeMapReduce extends StaticMapReduce<Object, Tree, Object, Tree, Tree> {
-
- public static final String TREE_STEP_SIDE_EFFECT_KEY = "gremlin.treeStep.sideEffectKey";
-
- private String sideEffectKey;
-
- private TreeMapReduce() {
-
- }
-
- public TreeMapReduce(final TreeStep step) {
- this.sideEffectKey = step.getSideEffectKey();
- }
-
- @Override
- public void storeState(final Configuration configuration) {
- super.storeState(configuration);
- configuration.setProperty(TREE_STEP_SIDE_EFFECT_KEY, this.sideEffectKey);
- }
-
- @Override
- public void loadState(final Configuration configuration) {
- this.sideEffectKey = configuration.getString(TREE_STEP_SIDE_EFFECT_KEY);
- }
-
- @Override
- public boolean doStage(final Stage stage) {
- return stage.equals(Stage.MAP);
- }
-
- @Override
- public void map(final Vertex vertex, final MapEmitter<Object, Tree> emitter) {
- VertexTraversalSideEffects.of(vertex).<Tree<?>>ifPresent(this.sideEffectKey, tree -> tree.splitParents().forEach(branches -> emitter.emit(branches.keySet().iterator().next(), branches)));
- }
-
- @Override
- public Tree generateFinalResult(final Iterator<KeyValue<Object, Tree>> keyValues) {
- final Tree result = new Tree();
- keyValues.forEachRemaining(keyValue -> result.addTree(keyValue.getValue()));
- return result;
- }
-
- @Override
- public String getMemoryKey() {
- return this.sideEffectKey;
- }
-}