You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/22 22:41:11 UTC
incubator-tinkerpop git commit: Have all the ReducingBarrierSteps no
longer implement MapReduce and using MemoryComputeKeys for their reduction.
GroupStep, FoldStep,
and GroupStepV3d0 are not complete yet. Having the darndest time with
GroupStep -- once
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/TINKERPOP-1166 b03c1adc4 -> 65bbdaa33
Have all the ReducingBarrierSteps no longer implement MapReduce and using MemoryComputeKeys for their reduction. GroupStep, FoldStep, and GroupStepV3d0 are not complete yet. Having the darndest time with GroupStep -- once I get it, then the others will follow from it. Pushing to save work.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/65bbdaa3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/65bbdaa3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/65bbdaa3
Branch: refs/heads/TINKERPOP-1166
Commit: 65bbdaa336b4034421f7e2599bb3f2c307aa4773
Parents: b03c1ad
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Feb 22 14:41:02 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Feb 22 14:41:02 2016 -0700
----------------------------------------------------------------------
.../traversal/TraversalVertexProgram.java | 31 ++++--
.../computer/traversal/TraverserExecutor.java | 28 +++--
.../traversal/step/map/CountGlobalStep.java | 101 ++----------------
.../process/traversal/step/map/FoldStep.java | 41 ++++----
.../traversal/step/map/GroupCountStep.java | 94 +++--------------
.../process/traversal/step/map/GroupStep.java | 11 +-
.../traversal/step/map/GroupStepV3d0.java | 9 +-
.../traversal/step/map/MaxGlobalStep.java | 103 +++----------------
.../traversal/step/map/MeanGlobalStep.java | 102 ++----------------
.../traversal/step/map/MinGlobalStep.java | 100 +++---------------
.../traversal/step/map/SumGlobalStep.java | 98 ++----------------
.../process/traversal/step/map/TreeStep.java | 97 ++++-------------
.../step/util/ReducingBarrierStep.java | 29 +++---
13 files changed, 176 insertions(+), 668 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index ad71f8a..d5059a7 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -37,6 +37,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectCapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.FinalGet;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal;
@@ -80,7 +81,8 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
// TODO: if not an adjacent traversal, use Local message scope -- a dual messaging system.
private static final Set<MessageScope> MESSAGE_SCOPES = new HashSet<>(Collections.singletonList(MessageScope.Global.instance()));
- private static final Set<VertexComputeKey> ELEMENT_COMPUTE_KEYS = new HashSet<>(Arrays.asList(
+ private Set<MemoryComputeKey> memoryComputeKeys = new HashSet<>();
+ private static final Set<VertexComputeKey> VERTEX_COMPUTE_KEYS = new HashSet<>(Arrays.asList(
VertexComputeKey.of(HALTED_TRAVERSERS, false),
VertexComputeKey.of(TraversalSideEffects.SIDE_EFFECTS, false)));
@@ -110,11 +112,18 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
if (!this.traversal.get().isLocked())
this.traversal.get().applyStrategies();
this.traversalMatrix = new TraversalMatrix<>(this.traversal.get());
+ this.memoryComputeKeys.add(MemoryComputeKey.of(VOTE_TO_HALT, MemoryComputeKey.andOperator(), true));
for (final MapReducer<?, ?, ?, ?, ?> mapReducer : TraversalHelper.getStepsOfAssignableClassRecursively(MapReducer.class, this.traversal.get())) {
this.mapReducers.add(mapReducer.getMapReduce());
+ this.memoryComputeKeys.add(MemoryComputeKey.of(mapReducer.getMapReduce().getMemoryKey(), MemoryComputeKey.setOperator(), false));
}
- if (!(this.traversal.get().getEndStep() instanceof SideEffectCapStep) && !(this.traversal.get().getEndStep() instanceof ReducingBarrierStep))
+ if (!(this.traversal.get().getEndStep() instanceof SideEffectCapStep) && !(this.traversal.get().getEndStep() instanceof ReducingBarrierStep)) {
this.mapReducers.add(new TraverserMapReduce(this.traversal.get()));
+ this.memoryComputeKeys.add(MemoryComputeKey.of(TraverserMapReduce.TRAVERSERS, MemoryComputeKey.setOperator(), false));
+ }
+ for (final ReducingBarrierStep<?, ?> reducingBarrierStep : TraversalHelper.getStepsOfAssignableClassRecursively(ReducingBarrierStep.class, this.traversal.get())) {
+ this.memoryComputeKeys.add(MemoryComputeKey.of(ReducingBarrierStep.REDUCING, reducingBarrierStep.getBiOperator(), false));
+ }
}
@Override
@@ -126,6 +135,9 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
@Override
public void setup(final Memory memory) {
memory.set(VOTE_TO_HALT, true);
+ for (final ReducingBarrierStep<?, ?> reducingBarrierStep : TraversalHelper.getStepsOfAssignableClassRecursively(ReducingBarrierStep.class, this.traversal.get())) {
+ memory.set(ReducingBarrierStep.REDUCING, reducingBarrierStep.getSeedSupplier().get());
+ }
}
@Override
@@ -162,9 +174,9 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
aliveTraverses.add((Traverser.Admin) traverser);
});
}
- memory.add(VOTE_TO_HALT, aliveTraverses.isEmpty() || TraverserExecutor.execute(vertex, new SingleMessenger<>(messenger, aliveTraverses), this.traversalMatrix));
+ memory.add(VOTE_TO_HALT, aliveTraverses.isEmpty() || TraverserExecutor.execute(vertex, new SingleMessenger<>(messenger, aliveTraverses), this.traversalMatrix, memory));
} else { // ITERATION 1+
- memory.add(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix));
+ memory.add(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix, memory));
}
}
@@ -172,6 +184,8 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
public boolean terminate(final Memory memory) {
final boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT);
if (voteToHalt) {
+ if (memory.exists(ReducingBarrierStep.REDUCING))
+ memory.set(ReducingBarrierStep.REDUCING, FinalGet.tryFinalGet(memory.get(ReducingBarrierStep.REDUCING)));
return true;
} else {
memory.set(VOTE_TO_HALT, true);
@@ -181,17 +195,12 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
@Override
public Set<VertexComputeKey> getVertexComputeKeys() {
- return ELEMENT_COMPUTE_KEYS;
+ return VERTEX_COMPUTE_KEYS;
}
@Override
public Set<MemoryComputeKey> getMemoryComputeKeys() {
- final Set<MemoryComputeKey> memoryComputeKeys = new HashSet<>();
- memoryComputeKeys.add(MemoryComputeKey.of(VOTE_TO_HALT, MemoryComputeKey.andOperator(), true));
- for (final MapReduce mapReduce : this.mapReducers) {
- memoryComputeKeys.add(MemoryComputeKey.of(mapReduce.getMemoryKey(), MemoryComputeKey.setOperator(), false));
- }
- return memoryComputeKeys;
+ return this.memoryComputeKeys;
}
@Override
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
index 078e880..4660183 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
@@ -18,12 +18,14 @@
*/
package org.apache.tinkerpop.gremlin.process.computer.traversal;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
import org.apache.tinkerpop.gremlin.process.computer.Messenger;
import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
import org.apache.tinkerpop.gremlin.structure.Edge;
@@ -40,7 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
*/
public final class TraverserExecutor {
- public static boolean execute(final Vertex vertex, final Messenger<TraverserSet<?>> messenger, final TraversalMatrix<?, ?> traversalMatrix) {
+ public static boolean execute(final Vertex vertex, final Messenger<TraverserSet<?>> messenger, final TraversalMatrix<?, ?> traversalMatrix, final Memory memory) {
final TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
final AtomicBoolean voteToHalt = new AtomicBoolean(true);
@@ -69,11 +71,11 @@ public final class TraverserExecutor {
traversers.remove();
final Step<?, ?> currentStep = traversalMatrix.getStepById(traverser.getStepId());
if (!currentStep.getId().equals(previousStep.getId()))
- TraverserExecutor.drainStep(previousStep, aliveTraversers, haltedTraversers);
+ TraverserExecutor.drainStep(previousStep, aliveTraversers, haltedTraversers, memory);
currentStep.addStart((Traverser.Admin) traverser);
previousStep = currentStep;
}
- TraverserExecutor.drainStep(previousStep, aliveTraversers, haltedTraversers);
+ TraverserExecutor.drainStep(previousStep, aliveTraversers, haltedTraversers, memory);
assert toProcessTraversers.isEmpty();
// process all the local objects and send messages or store locally again
if (!aliveTraversers.isEmpty()) {
@@ -102,14 +104,18 @@ public final class TraverserExecutor {
return voteToHalt.get();
}
- private static void drainStep(final Step<?, ?> step, final TraverserSet<?> aliveTraversers, final TraverserSet<?> haltedTraversers) {
- step.forEachRemaining(traverser -> {
- if (traverser.asAdmin().isHalted()) {
- traverser.asAdmin().detach();
- haltedTraversers.add((Traverser.Admin) traverser);
- } else
- aliveTraversers.add((Traverser.Admin) traverser);
- });
+ private static void drainStep(final Step<?, ?> step, final TraverserSet<?> aliveTraversers, final TraverserSet<?> haltedTraversers, final Memory memory) {
+ if (step instanceof ReducingBarrierStep) {
+ memory.add(ReducingBarrierStep.REDUCING, step.next().get());
+ } else {
+ step.forEachRemaining(traverser -> {
+ if (traverser.asAdmin().isHalted()) {
+ traverser.asAdmin().detach();
+ haltedTraversers.add((Traverser.Admin) traverser);
+ } else
+ aliveTraversers.add((Traverser.Admin) traverser);
+ });
+ }
}
private static Vertex getHostingVertex(final Object object) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
index 096580d..4d72146 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
@@ -18,128 +18,49 @@
*/
package org.apache.tinkerpop.gremlin.process.traversal.step.map;
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.function.ConstantSupplier;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import java.io.Serializable;
-import java.util.Collections;
import java.util.EnumSet;
-import java.util.Iterator;
import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class CountGlobalStep<S> extends ReducingBarrierStep<S, Long> implements MapReducer {
+public final class CountGlobalStep<S> extends ReducingBarrierStep<S, Long> {
private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(TraverserRequirement.BULK);
public CountGlobalStep(final Traversal.Admin traversal) {
super(traversal);
this.setSeedSupplier(new ConstantSupplier<>(0L));
- this.setBiFunction(CountBiFunction.<S>instance());
+ this.setReducingBiOperator(new CountBiOperator());
}
-
@Override
- public Set<TraverserRequirement> getRequirements() {
- return REQUIREMENTS;
+ public Long projectTraverser(final Traverser.Admin<S> traverser) {
+ return traverser.bulk();
}
+
@Override
- public MapReduce<MapReduce.NullObject, Long, MapReduce.NullObject, Long, Long> getMapReduce() {
- return CountGlobalMapReduce.instance();
+ public Set<TraverserRequirement> getRequirements() {
+ return REQUIREMENTS;
}
///////////
- private static class CountBiFunction<S> implements BiFunction<Long, Traverser<S>, Long>, Serializable {
-
- private static final CountBiFunction INSTANCE = new CountBiFunction();
-
- private CountBiFunction() {
-
- }
+ private static class CountBiOperator implements BinaryOperator<Long>, Serializable {
@Override
- public Long apply(final Long mutatingSeed, final Traverser<S> traverser) {
- return mutatingSeed + traverser.bulk();
- }
-
- public final static <S> CountBiFunction<S> instance() {
- return INSTANCE;
+ public Long apply(final Long mutatingSeed, final Long count) {
+ return mutatingSeed + count;
}
}
- ///////////
-
- private static class CountGlobalMapReduce extends StaticMapReduce<MapReduce.NullObject, Long, MapReduce.NullObject, Long, Long> {
-
- private static CountGlobalMapReduce INSTANCE = new CountGlobalMapReduce();
-
- private CountGlobalMapReduce() {
-
- }
-
- @Override
- public boolean doStage(final MapReduce.Stage stage) {
- return true;
- }
-
- @Override
- public void map(final Vertex vertex, final MapEmitter<NullObject, Long> emitter) {
- final Iterator<Long> values = IteratorUtils.map(vertex.<Set<Traverser.Admin<Long>>>property(TraversalVertexProgram.HALTED_TRAVERSERS).orElse(Collections.emptySet()).iterator(),
- traverser -> traverser.get() * traverser.bulk());
- long count = getCount(values);
- if (count > 0)
- emitter.emit(count);
- }
-
- @Override
- public void combine(final NullObject key, final Iterator<Long> values, final ReduceEmitter<NullObject, Long> emitter) {
- this.reduce(key, values, emitter);
- }
-
- @Override
- public void reduce(final NullObject key, final Iterator<Long> values, final ReduceEmitter<NullObject, Long> emitter) {
- long count = getCount(values);
- if (count > 0)
- emitter.emit(count);
- }
-
- private Long getCount(final Iterator<Long> longs) {
- long count = 0l;
- while (longs.hasNext()) {
- count = count + longs.next();
- }
- return count;
- }
-
- @Override
- public String getMemoryKey() {
- return REDUCING;
- }
-
- @Override
- public Long generateFinalResult(final Iterator<KeyValue<NullObject, Long>> keyValues) {
- return keyValues.hasNext() ? keyValues.next().getValue() : 0L;
- }
-
- public static final CountGlobalMapReduce instance() {
- return INSTANCE;
- }
-
- }
-
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
index c61f89f..ca45767 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
@@ -25,10 +25,12 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequire
import org.apache.tinkerpop.gremlin.util.function.ArrayListSupplier;
import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.Collections;
import java.util.EnumSet;
+import java.util.List;
import java.util.Set;
import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
import java.util.function.Supplier;
/**
@@ -39,13 +41,18 @@ public final class FoldStep<S, E> extends ReducingBarrierStep<S, E> {
private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(TraverserRequirement.OBJECT);
public FoldStep(final Traversal.Admin traversal) {
- this(traversal, (Supplier) ArrayListSupplier.instance(), (BiFunction) ArrayListBiFunction.instance());
+ this(traversal, (Supplier) ArrayListSupplier.instance(), (BiFunction) new ListBiOperator<>());
+ }
+
+ @Override
+ public E projectTraverser(final Traverser.Admin<S> traverser) {
+ return (E) Collections.singletonList(traverser.get());
}
public FoldStep(final Traversal.Admin traversal, final Supplier<E> seed, final BiFunction<E, S, E> foldFunction) {
super(traversal);
this.setSeedSupplier(seed);
- this.setBiFunction(new FoldBiFunction<>(foldFunction));
+ this.setReducingBiOperator(new FoldBiOperator<>(foldFunction));
}
@Override
@@ -55,41 +62,29 @@ public final class FoldStep<S, E> extends ReducingBarrierStep<S, E> {
/////////
- private static class ArrayListBiFunction<S> implements BiFunction<ArrayList<S>, S, ArrayList<S>>, Serializable {
-
- private static final ArrayListBiFunction INSTANCE = new ArrayListBiFunction();
-
- private ArrayListBiFunction() {
-
- }
+ private static class ListBiOperator<S> implements BinaryOperator<List<S>>, Serializable {
@Override
- public ArrayList<S> apply(final ArrayList<S> mutatingSeed, final S traverser) {
- mutatingSeed.add(traverser);
+ public List<S> apply(final List<S> mutatingSeed, final List<S> list) {
+ mutatingSeed.addAll(list);
return mutatingSeed;
}
- public final static <S> ArrayListBiFunction<S> instance() {
- return INSTANCE;
- }
}
///////
- public static class FoldBiFunction<S, E> implements BiFunction<E, Traverser<S>, E>, Serializable {
+ public static class FoldBiOperator<E> implements BinaryOperator<E>, Serializable {
- private final BiFunction<E, S, E> biFunction;
+ private final BiFunction biFunction;
- public FoldBiFunction(final BiFunction<E, S, E> biFunction) {
+ public FoldBiOperator(final BiFunction biFunction) {
this.biFunction = biFunction;
}
@Override
- public E apply(E seed, final Traverser<S> traverser) {
- for (int i = 0; i < traverser.bulk(); i++) {
- seed = this.biFunction.apply(seed, traverser.get());
- }
- return seed;
+ public E apply(E seed, E other) {
+ return (E) this.biFunction.apply(seed, other);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
index 6c7762c..e669bca 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
@@ -18,44 +18,36 @@
*/
package org.apache.tinkerpop.gremlin.process.traversal.step.map;
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.MapHelper;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
import java.io.Serializable;
import java.util.Collections;
import java.util.HashMap;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Long>> implements MapReducer, TraversalParent, ByModulating {
+public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Long>> implements TraversalParent, ByModulating {
private Traversal.Admin<S, E> groupTraversal = null;
public GroupCountStep(final Traversal.Admin traversal) {
super(traversal);
this.setSeedSupplier(HashMapSupplier.instance());
- this.setBiFunction(new GroupCountBiFunction(this));
+ this.setReducingBiOperator(new GroupCountBiOperator());
}
@@ -69,14 +61,15 @@ public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Lo
return null == this.groupTraversal ? Collections.emptyList() : Collections.singletonList(this.groupTraversal);
}
- @Override
- public Set<TraverserRequirement> getRequirements() {
- return this.getSelfAndChildRequirements(TraverserRequirement.BULK);
+ public Map<E, Long> projectTraverser(final Traverser.Admin<S> traverser) {
+ final Map<E, Long> map = new HashMap<>(); // TODO: make singleton map
+ map.put(TraversalUtil.applyNullable(traverser, this.groupTraversal), traverser.bulk());
+ return map;
}
@Override
- public MapReduce<E, Long, E, Long, Map<E, Long>> getMapReduce() {
- return GroupCountMapReduce.instance();
+ public Set<TraverserRequirement> getRequirements() {
+ return this.getSelfAndChildRequirements(TraverserRequirement.BULK);
}
@Override
@@ -84,7 +77,6 @@ public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Lo
final GroupCountStep<S, E> clone = (GroupCountStep<S, E>) super.clone();
if (null != this.groupTraversal)
clone.groupTraversal = clone.integrateChild(this.groupTraversal.clone());
- clone.setBiFunction(new GroupCountBiFunction<>(clone));
return clone;
}
@@ -104,74 +96,12 @@ public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Lo
///////////
- private static class GroupCountBiFunction<S, E> implements BiFunction<Map<E, Long>, Traverser<S>, Map<E, Long>>, Serializable {
-
- private final GroupCountStep<S, E> groupCountStep;
-
- private GroupCountBiFunction(final GroupCountStep<S, E> groupCountStep) {
- this.groupCountStep = groupCountStep;
-
- }
+ private static class GroupCountBiOperator<E> implements BinaryOperator<Map<E, Long>>, Serializable {
@Override
- public Map<E, Long> apply(final Map<E, Long> mutatingSeed, final Traverser<S> traverser) {
- MapHelper.incr(mutatingSeed, TraversalUtil.applyNullable(traverser.asAdmin(), this.groupCountStep.groupTraversal), traverser.bulk());
+ public Map<E, Long> apply(final Map<E, Long> mutatingSeed, final Map<E, Long> map) {
+ map.forEach((k, v) -> MapHelper.incr(mutatingSeed, k, v));
return mutatingSeed;
}
}
-
- ///////////
-
- public static final class GroupCountMapReduce<E> extends StaticMapReduce<E, Long, E, Long, Map<E, Long>> {
-
- private static final GroupCountMapReduce INSTANCE = new GroupCountMapReduce();
-
- private GroupCountMapReduce() {
-
- }
-
- @Override
- public boolean doStage(final Stage stage) {
- return true;
- }
-
- @Override
- public void map(final Vertex vertex, final MapEmitter<E, Long> emitter) {
- final Map<E, Long> groupCount = new HashMap<>();
- vertex.<TraverserSet<Map<E, Long>>>property(TraversalVertexProgram.HALTED_TRAVERSERS).ifPresent(traverserSet ->
- traverserSet.forEach(traverser ->
- traverser.get().forEach((k, v) -> MapHelper.incr(groupCount, k, (v * traverser.bulk())))));
- groupCount.forEach(emitter::emit);
- }
-
- @Override
- public void reduce(final E key, final Iterator<Long> values, final ReduceEmitter<E, Long> emitter) {
- long counter = 0;
- while (values.hasNext()) {
- counter = counter + values.next();
- }
- emitter.emit(key, counter);
- }
-
- @Override
- public void combine(final E key, final Iterator<Long> values, final ReduceEmitter<E, Long> emitter) {
- reduce(key, values, emitter);
- }
-
- @Override
- public Map<E, Long> generateFinalResult(final Iterator<KeyValue<E, Long>> keyValues) {
- final Map<E, Long> map = new HashMap<>();
- keyValues.forEachRemaining(keyValue -> map.put(keyValue.getKey(), keyValue.getValue()));
- return map;
- }
-
- @Override
- public String getMemoryKey() {
- return REDUCING;
- }
-
- public static final <E> GroupCountMapReduce<E> instance() {
- return INSTANCE;
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
index de3fd28..a673fea 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
@@ -70,7 +70,12 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
public GroupStep(final Traversal.Admin traversal) {
super(traversal);
this.setSeedSupplier((Supplier) new GroupStepHelper.GroupMapSupplier());
- this.setBiFunction(new GroupBiFunction(this));
+ //this.setBiFunction(new GroupBiFunction(this));
+ }
+
+ @Override
+ public Map<K, V> projectTraverser(Traverser.Admin<S> traverser) {
+ return new HashMap<>();
}
@Override
@@ -90,7 +95,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
}
@Override
- public void modulateBy(final Traversal.Admin<?,?> kvTraversal) {
+ public void modulateBy(final Traversal.Admin<?, ?> kvTraversal) {
if ('k' == this.state) {
this.keyTraversal = this.integrateChild(kvTraversal);
this.state = 'v';
@@ -118,7 +123,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
clone.valueReduceTraversal = clone.integrateChild(this.valueReduceTraversal.clone());
clone.valueTraversal = clone.integrateChild(this.valueTraversal.clone());
clone.reduceTraversal = clone.integrateChild(this.reduceTraversal.clone());
- clone.setBiFunction(new GroupBiFunction<>((GroupStep) clone));
+ //clone.setBiFunction(new GroupBiFunction<>((GroupStep) clone));
return clone;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
index d28d4e3..a2b7f5e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
@@ -69,7 +69,12 @@ public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<
public GroupStepV3d0(final Traversal.Admin traversal) {
super(traversal);
this.setSeedSupplier((Supplier) new GroupMapSupplierV3d0());
- this.setBiFunction(new GroupBiFunction(this));
+ // this.setBiFunction(new GroupBiFunction(this));
+ }
+
+ @Override
+ public Map<K, R> projectTraverser(Traverser.Admin<S> traverser) {
+ return null;
}
@Override
@@ -123,7 +128,7 @@ public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<
clone.valueTraversal = clone.integrateChild(this.valueTraversal.clone());
if (null != this.reduceTraversal)
clone.reduceTraversal = clone.integrateChild(this.reduceTraversal.clone());
- clone.setBiFunction(new GroupBiFunction<>((GroupStepV3d0) clone));
+ // clone.setBiFunction(new GroupBiFunction<>((GroupStepV3d0) clone));
return clone;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
index c552bab..7db28e5 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
@@ -18,124 +18,49 @@
*/
package org.apache.tinkerpop.gremlin.process.traversal.step.map;
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.function.ConstantSupplier;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import java.io.Serializable;
import java.util.Collections;
-import java.util.Iterator;
import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.max;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class MaxGlobalStep<S extends Number> extends ReducingBarrierStep<S, S> implements MapReducer {
+public final class MaxGlobalStep<S extends Number> extends ReducingBarrierStep<S, S> {
+
+
+ private static final Double NAN = Double.valueOf(Double.NaN);
public MaxGlobalStep(final Traversal.Admin traversal) {
super(traversal);
- this.setSeedSupplier(new ConstantSupplier<>(null));
- this.setBiFunction(MaxGlobalBiFunction.<S>instance());
+ this.setSeedSupplier(new ConstantSupplier<>((S) NAN));
+ this.setReducingBiOperator(new MaxGlobalBiOperator<>());
}
@Override
- public Set<TraverserRequirement> getRequirements() {
- return Collections.singleton(TraverserRequirement.OBJECT);
+ public S projectTraverser(final Traverser.Admin<S> traverser) {
+ return traverser.get();
}
@Override
- public MapReduce<MapReduce.NullObject, Number, MapReduce.NullObject, Number, Number> getMapReduce() {
- return MaxGlobalMapReduce.instance();
+ public Set<TraverserRequirement> getRequirements() {
+ return Collections.singleton(TraverserRequirement.OBJECT);
}
/////
- private static class MaxGlobalBiFunction<S extends Number> implements BiFunction<S, Traverser<S>, S>, Serializable {
-
- private static final MaxGlobalBiFunction INSTANCE = new MaxGlobalBiFunction();
-
- private MaxGlobalBiFunction() {
-
- }
-
+ private static class MaxGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
@Override
- public S apply(final S mutatingSeed, final Traverser<S> traverser) {
- final S value = traverser.get();
- return mutatingSeed != null ? (S) max(mutatingSeed, traverser.get()) : value;
- }
-
- public static <S extends Number> MaxGlobalBiFunction<S> instance() {
- return INSTANCE;
- }
- }
-
- ///////////
-
- private static class MaxGlobalMapReduce extends StaticMapReduce<MapReduce.NullObject, Number, MapReduce.NullObject, Number, Number> {
-
- private static final MaxGlobalMapReduce INSTANCE = new MaxGlobalMapReduce();
-
- private MaxGlobalMapReduce() {
-
- }
-
- @Override
- public boolean doStage(final MapReduce.Stage stage) {
- return true;
- }
-
- @Override
- public void map(final Vertex vertex, final MapEmitter<NullObject, Number> emitter) {
- final Iterator<Number> values = IteratorUtils.map(vertex.<Set<Traverser.Admin<Number>>>property(TraversalVertexProgram.HALTED_TRAVERSERS).orElse(Collections.emptySet()).iterator(), Traverser.Admin::get);
- if (values.hasNext())
- emitter.emit(getMax(values));
- }
-
- @Override
- public void combine(final NullObject key, final Iterator<Number> values, final ReduceEmitter<NullObject, Number> emitter) {
- this.reduce(key, values, emitter);
- }
-
- @Override
- public void reduce(final NullObject key, final Iterator<Number> values, final ReduceEmitter<NullObject, Number> emitter) {
- if (values.hasNext())
- emitter.emit(getMax(values));
- }
-
- private Number getMax(final Iterator<Number> numbers) {
- Number max = null;
- while (numbers.hasNext()) {
- final Number value = numbers.next();
- max = max != null ? max(value, max) : value;
- }
- return max;
- }
-
- @Override
- public String getMemoryKey() {
- return REDUCING;
- }
-
- @Override
- public Number generateFinalResult(final Iterator<KeyValue<NullObject, Number>> keyValues) {
- return keyValues.hasNext() ? keyValues.next().getValue() : Double.NaN;
-
- }
-
- public static MaxGlobalMapReduce instance() {
- return INSTANCE;
+ public S apply(final S mutatingSeed, final S number) {
+ return !NAN.equals(mutatingSeed) ? (S) max(mutatingSeed, number) : number;
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
index a4e4272..53688cd 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
@@ -18,29 +18,20 @@
*/
package org.apache.tinkerpop.gremlin.process.traversal.step.map;
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
import org.apache.tinkerpop.gremlin.process.traversal.NumberHelper;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.FinalGet;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.function.MeanNumberSupplier;
import java.io.Serializable;
import java.util.EnumSet;
-import java.util.Iterator;
import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
import java.util.function.Supplier;
-import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.add;
import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.div;
import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.mul;
@@ -48,109 +39,36 @@ import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.mul;
* @author Marko A. Rodriguez (http://markorodriguez.com)
* @author Daniel Kuppitz (http://gremlin.guru)
*/
-public final class MeanGlobalStep<S extends Number, E extends Number> extends ReducingBarrierStep<S, E> implements MapReducer {
+public final class MeanGlobalStep<S extends Number, E extends Number> extends ReducingBarrierStep<S, E> {
private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(TraverserRequirement.OBJECT, TraverserRequirement.BULK);
public MeanGlobalStep(final Traversal.Admin traversal) {
super(traversal);
this.setSeedSupplier((Supplier) MeanNumberSupplier.instance());
- this.setBiFunction((BiFunction) MeanGlobalBiFunction.instance());
+ this.setReducingBiOperator(new MeanGlobalBiOperator<>());
}
@Override
- public Set<TraverserRequirement> getRequirements() {
- return REQUIREMENTS;
+ public E projectTraverser(final Traverser.Admin<S> traverser) {
+ return (E) new MeanNumber(traverser.get(), traverser.bulk());
}
@Override
- public MapReduce<Number, Long, Number, Long, Number> getMapReduce() {
- return MeanGlobalMapReduce.instance();
+ public Set<TraverserRequirement> getRequirements() {
+ return REQUIREMENTS;
}
/////
- private static class MeanGlobalBiFunction<S extends Number> implements BiFunction<S, Traverser<S>, S>, Serializable {
-
- private static final MeanGlobalBiFunction INSTANCE = new MeanGlobalBiFunction();
-
- private MeanGlobalBiFunction() {
-
- }
-
- @Override
- public S apply(final S mutatingSeed, final Traverser<S> traverser) {
- return (S) ((MeanNumber) mutatingSeed).add(traverser.get(), traverser.bulk());
- }
-
- public static <S extends Number> MeanGlobalBiFunction<S> instance() {
- return INSTANCE;
- }
- }
-
- ///////////
-
- private static final class MeanGlobalMapReduce extends StaticMapReduce<Number, Long, Number, Long, Number> {
-
- private static final MeanGlobalMapReduce INSTANCE = new MeanGlobalMapReduce();
-
- private MeanGlobalMapReduce() {
-
- }
-
- @Override
- public boolean doStage(final MapReduce.Stage stage) {
- return true;
- }
-
- @Override
- public void map(final Vertex vertex, final MapEmitter<Number, Long> emitter) {
- vertex.<TraverserSet<Number>>property(TraversalVertexProgram.HALTED_TRAVERSERS).ifPresent(traverserSet -> traverserSet.forEach(traverser -> emitter.emit(traverser.get(), traverser.bulk())));
- }
-
- @Override
- public void combine(final Number key, final Iterator<Long> values, final ReduceEmitter<Number, Long> emitter) {
- this.reduce(key, values, emitter);
- }
+ private static class MeanGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
@Override
- public void reduce(final Number key, final Iterator<Long> values, final ReduceEmitter<Number, Long> emitter) {
- long counter = 0;
- while (values.hasNext()) {
- counter = counter + values.next();
- }
- emitter.emit(key, counter);
- }
-
- @Override
- public String getMemoryKey() {
- return REDUCING;
- }
-
- @Override
- public Number generateFinalResult(final Iterator<KeyValue<Number, Long>> keyValues) {
- if (keyValues.hasNext()) {
- KeyValue<Number, Long> pair = keyValues.next();
- long counter = pair.getValue();
- Number result = mul(pair.getKey(), counter);
- while (keyValues.hasNext()) {
- long incr = pair.getValue();
- pair = keyValues.next();
- result = add(result, mul(pair.getKey(), incr));
- counter += incr;
- }
- return div(result, counter, true);
- }
- return Double.NaN;
- }
-
- public static MeanGlobalMapReduce instance() {
- return INSTANCE;
+ public S apply(final S mutatingSeed, final S number) {
+ return (S) (number instanceof MeanNumber ? ((MeanNumber) mutatingSeed).add((MeanNumber) number) : ((MeanNumber) mutatingSeed).add(number, 1l));
}
}
- ///
-
public static final class MeanNumber extends Number implements Comparable<Number>, FinalGet<Number> {
private long count;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
index 88dd46f..a1bc2c4 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
@@ -18,122 +18,48 @@
*/
package org.apache.tinkerpop.gremlin.process.traversal.step.map;
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.function.ConstantSupplier;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import java.io.Serializable;
import java.util.Collections;
-import java.util.Iterator;
import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.min;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class MinGlobalStep<S extends Number> extends ReducingBarrierStep<S, S> implements MapReducer {
+public final class MinGlobalStep<S extends Number> extends ReducingBarrierStep<S, S> {
+
+ private static final Double NAN = Double.valueOf(Double.NaN);
public MinGlobalStep(final Traversal.Admin traversal) {
super(traversal);
- this.setSeedSupplier(new ConstantSupplier<>(null));
- this.setBiFunction(MinGlobalBiFunction.instance());
+ this.setSeedSupplier(new ConstantSupplier<>((S) NAN));
+ this.setReducingBiOperator(new MinGlobalBiOperator<>());
}
@Override
- public Set<TraverserRequirement> getRequirements() {
- return Collections.singleton(TraverserRequirement.OBJECT);
+ public S projectTraverser(final Traverser.Admin<S> traverser) {
+ return traverser.get();
}
@Override
- public MapReduce<MapReduce.NullObject, Number, MapReduce.NullObject, Number, Number> getMapReduce() {
- return MinGlobalMapReduce.instance();
+ public Set<TraverserRequirement> getRequirements() {
+ return Collections.singleton(TraverserRequirement.OBJECT);
}
/////
- private static class MinGlobalBiFunction<S extends Number> implements BiFunction<S, Traverser<S>, S>, Serializable {
-
- private static final MinGlobalBiFunction INSTANCE = new MinGlobalBiFunction();
-
- private MinGlobalBiFunction() {
-
- }
-
+ private static class MinGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
@Override
- public S apply(final S mutatingSeed, final Traverser<S> traverser) {
- final S value = traverser.get();
- return mutatingSeed != null ? (S) min(mutatingSeed, traverser.get()) : value;
- }
-
- public static <S extends Number> MinGlobalBiFunction<S> instance() {
- return INSTANCE;
- }
- }
-
- ///////////
-
- private static class MinGlobalMapReduce extends StaticMapReduce<MapReduce.NullObject, Number, MapReduce.NullObject, Number, Number> {
-
- private static final MinGlobalMapReduce INSTANCE = new MinGlobalMapReduce();
-
- private MinGlobalMapReduce() {
-
- }
-
- @Override
- public boolean doStage(final MapReduce.Stage stage) {
- return true;
- }
-
- @Override
- public void map(final Vertex vertex, final MapEmitter<NullObject, Number> emitter) {
- final Iterator<Number> values = IteratorUtils.map(vertex.<Set<Traverser.Admin<Number>>>property(TraversalVertexProgram.HALTED_TRAVERSERS).orElse(Collections.emptySet()).iterator(), Traverser.Admin::get);
- if (values.hasNext())
- emitter.emit(getMin(values));
- }
-
- @Override
- public void combine(final NullObject key, final Iterator<Number> values, final ReduceEmitter<NullObject, Number> emitter) {
- this.reduce(key, values, emitter);
- }
-
- @Override
- public void reduce(final NullObject key, final Iterator<Number> values, final ReduceEmitter<NullObject, Number> emitter) {
- if (values.hasNext()) emitter.emit(getMin(values));
- }
-
- private Number getMin(final Iterator<Number> numbers) {
- Number min = null;
- while (numbers.hasNext()) {
- final Number value = numbers.next();
- min = min != null ? min(value, min) : value;
- }
- return min;
- }
-
- @Override
- public String getMemoryKey() {
- return REDUCING;
- }
-
- @Override
- public Number generateFinalResult(final Iterator<KeyValue<NullObject, Number>> keyValues) {
- return keyValues.hasNext() ? keyValues.next().getValue() : Double.NaN;
- }
-
- public static MinGlobalMapReduce instance() {
- return INSTANCE;
+ public S apply(final S mutatingSeed, final S number) {
+ return !NAN.equals(mutatingSeed) ? (S) min(mutatingSeed, number) : number;
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
index 8c13192..bea7717 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
@@ -18,25 +18,16 @@
*/
package org.apache.tinkerpop.gremlin.process.traversal.step.map;
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.util.function.ConstantSupplier;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import java.io.Serializable;
-import java.util.Collections;
import java.util.EnumSet;
-import java.util.Iterator;
import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.add;
import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.mul;
@@ -44,7 +35,7 @@ import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.mul;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class SumGlobalStep<S extends Number> extends ReducingBarrierStep<S, S> implements MapReducer {
+public final class SumGlobalStep<S extends Number> extends ReducingBarrierStep<S, S> {
private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
TraverserRequirement.BULK,
@@ -54,94 +45,27 @@ public final class SumGlobalStep<S extends Number> extends ReducingBarrierStep<S
public SumGlobalStep(final Traversal.Admin traversal) {
super(traversal);
this.setSeedSupplier(new ConstantSupplier<>((S) Integer.valueOf(0)));
- this.setBiFunction(SumGlobalBiFunction.instance());
+ this.setReducingBiOperator(new SumGlobalBiOperator<S>());
}
-
@Override
- public Set<TraverserRequirement> getRequirements() {
- return REQUIREMENTS;
+ public S projectTraverser(final Traverser.Admin<S> traverser) {
+ return (S) mul(traverser.get(), traverser.bulk());
}
+
@Override
- public MapReduce<MapReduce.NullObject, Number, MapReduce.NullObject, Number, Number> getMapReduce() {
- return SumGlobalMapReduce.instance();
+ public Set<TraverserRequirement> getRequirements() {
+ return REQUIREMENTS;
}
/////
- private static class SumGlobalBiFunction<S extends Number> implements BiFunction<S, Traverser<S>, S>, Serializable {
-
- private static final SumGlobalBiFunction INSTANCE = new SumGlobalBiFunction<>();
-
- private SumGlobalBiFunction() {
-
- }
+ private static class SumGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
@Override
- public S apply(final S mutatingSeed, final Traverser<S> traverser) {
- return (S) add(mutatingSeed, mul(traverser.get(), traverser.bulk()));
- }
-
- public static <S extends Number> SumGlobalBiFunction<S> instance() {
- return INSTANCE;
- }
- }
-
- ///////////
-
- private static class SumGlobalMapReduce extends StaticMapReduce<MapReduce.NullObject, Number, MapReduce.NullObject, Number, Number> {
-
- private static final SumGlobalMapReduce INSTANCE = new SumGlobalMapReduce();
-
- private SumGlobalMapReduce() {
-
- }
-
- @Override
- public boolean doStage(final MapReduce.Stage stage) {
- return true;
- }
-
- @Override
- public void map(final Vertex vertex, final MapEmitter<NullObject, Number> emitter) {
- final Iterator<Number> values = IteratorUtils.map(vertex.<Set<Traverser.Admin<Number>>>property(TraversalVertexProgram.HALTED_TRAVERSERS).orElse(Collections.emptySet()).iterator(),
- traverser -> mul(traverser.get(), traverser.bulk()));
- if (values.hasNext())
- emitter.emit(getSum(values));
- }
-
- @Override
- public void combine(final NullObject key, final Iterator<Number> values, final ReduceEmitter<NullObject, Number> emitter) {
- this.reduce(key, values, emitter);
- }
-
- @Override
- public void reduce(final NullObject key, final Iterator<Number> values, final ReduceEmitter<NullObject, Number> emitter) {
- if (values.hasNext())
- emitter.emit(getSum(values));
- }
-
- private Number getSum(final Iterator<Number> numbers) {
- Number sum = numbers.next();
- while (numbers.hasNext()) {
- sum = add(sum, numbers.next());
- }
- return sum;
- }
-
- @Override
- public String getMemoryKey() {
- return REDUCING;
- }
-
- @Override
- public Number generateFinalResult(final Iterator<KeyValue<NullObject, Number>> keyValues) {
- return keyValues.hasNext() ? keyValues.next().getValue() : 0;
- }
-
- public static SumGlobalMapReduce instance() {
- return INSTANCE;
+ public S apply(final S mutatingSeed, final S number) {
+ return (S) add(mutatingSeed, number);
}
}
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
index 07d8112..fa41210 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
@@ -18,45 +18,37 @@
*/
package org.apache.tinkerpop.gremlin.process.traversal.step.map;
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
import org.apache.tinkerpop.gremlin.process.traversal.Path;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.step.PathProcessor;
import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
import org.apache.tinkerpop.gremlin.process.traversal.step.util.Tree;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalRing;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
import org.apache.tinkerpop.gremlin.util.function.TreeSupplier;
import java.io.Serializable;
-import java.util.Iterator;
import java.util.List;
import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
import java.util.function.Supplier;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public final class TreeStep<S> extends ReducingBarrierStep<S, Tree> implements MapReducer, TraversalParent, ByModulating, PathProcessor {
+public final class TreeStep<S> extends ReducingBarrierStep<S, Tree> implements TraversalParent, ByModulating, PathProcessor {
private TraversalRing<Object, Object> traversalRing = new TraversalRing<>();
public TreeStep(final Traversal.Admin traversal) {
super(traversal);
this.setSeedSupplier((Supplier) TreeSupplier.instance());
- this.setBiFunction(new TreeBiFunction(this));
+ this.setReducingBiOperator(new TreeBiOperator());
}
@@ -76,16 +68,26 @@ public final class TreeStep<S> extends ReducingBarrierStep<S, Tree> implements M
}
@Override
- public MapReduce<MapReduce.NullObject, Tree, MapReduce.NullObject, Tree, Tree> getMapReduce() {
- return TreeMapReduce.instance();
+ public Tree projectTraverser(final Traverser.Admin<S> traverser) {
+ final Tree topTree = new Tree();
+ Tree depth = topTree;
+ final Path path = traverser.path();
+ for (int i = 0; i < path.size(); i++) {
+ final Object object = TraversalUtil.apply(path.<Object>get(i), this.traversalRing.next());
+ if (!depth.containsKey(object))
+ depth.put(object, new Tree<>());
+ depth = (Tree) depth.get(object);
+ }
+ this.traversalRing.reset();
+ return topTree;
}
+
@Override
public TreeStep<S> clone() {
final TreeStep<S> clone = (TreeStep<S>) super.clone();
clone.traversalRing = this.traversalRing.clone();
clone.getLocalChildren().forEach(clone::integrateChild);
- clone.setBiFunction(new TreeBiFunction<>(clone));
return clone;
}
@@ -107,73 +109,12 @@ public final class TreeStep<S> extends ReducingBarrierStep<S, Tree> implements M
///////////
- private static class TreeBiFunction<S> implements BiFunction<Tree, Traverser<S>, Tree>, Serializable {
-
- private final TreeStep<S> treeStep;
-
- private TreeBiFunction(final TreeStep<S> treeStep) {
- this.treeStep = treeStep;
- }
+ private static class TreeBiOperator implements BinaryOperator<Tree>, Serializable {
@Override
- public Tree apply(final Tree mutatingSeed, final Traverser<S> traverser) {
- Tree depth = mutatingSeed;
- final Path path = traverser.path();
- for (int i = 0; i < path.size(); i++) {
- final Object object = TraversalUtil.apply(path.<Object>get(i), this.treeStep.traversalRing.next());
- if (!depth.containsKey(object))
- depth.put(object, new Tree<>());
- depth = (Tree) depth.get(object);
- }
- this.treeStep.traversalRing.reset();
+ public Tree apply(final Tree mutatingSeed, final Tree tree) {
+ mutatingSeed.addTree(tree);
return mutatingSeed;
}
}
-
- ///////////
-
- public static final class TreeMapReduce extends StaticMapReduce<MapReduce.NullObject, Tree, MapReduce.NullObject, Tree, Tree> {
-
- private static final TreeMapReduce INSTANCE = new TreeMapReduce();
-
- private TreeMapReduce() {
-
- }
-
- @Override
- public boolean doStage(final Stage stage) {
- return true;
- }
-
- @Override
- public void map(final Vertex vertex, final MapEmitter<NullObject, Tree> emitter) {
- vertex.<TraverserSet<Tree>>property(TraversalVertexProgram.HALTED_TRAVERSERS).ifPresent(traverserSet -> traverserSet.forEach(traverser -> emitter.emit(traverser.get())));
- }
-
- @Override
- public void combine(final NullObject key, final Iterator<Tree> values, final ReduceEmitter<NullObject, Tree> emitter) {
- this.reduce(key, values, emitter);
- }
-
- @Override
- public void reduce(final NullObject key, final Iterator<Tree> values, final ReduceEmitter<NullObject, Tree> emitter) {
- final Tree tree = new Tree();
- values.forEachRemaining(tree::addTree);
- emitter.emit(tree);
- }
-
- @Override
- public Tree generateFinalResult(final Iterator<KeyValue<NullObject, Tree>> keyValues) {
- return keyValues.hasNext() ? keyValues.next().getValue() : new Tree();
- }
-
- @Override
- public String getMemoryKey() {
- return REDUCING;
- }
-
- public static final TreeMapReduce instance() {
- return INSTANCE;
- }
- }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java
index ea94499..921385c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java
@@ -28,7 +28,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.Step;
import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
@@ -37,21 +36,21 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
import java.util.Iterator;
import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
import java.util.function.Supplier;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
*/
-public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> implements MapReducer, Barrier {
+public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> implements Barrier {
public static final String REDUCING = Graph.Hidden.hide("reducing");
protected Supplier<E> seedSupplier;
- protected BiFunction<E, Traverser<S>, E> reducingBiFunction;
+ protected BinaryOperator<E> reducingBiOperator;
private boolean done = false;
-
private E seed = null;
public ReducingBarrierStep(final Traversal.Admin traversal) {
@@ -62,8 +61,18 @@ public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> imple
this.seedSupplier = seedSupplier;
}
- public void setBiFunction(final BiFunction<E, Traverser<S>, E> reducingBiFunction) {
- this.reducingBiFunction = reducingBiFunction;
+ public Supplier<E> getSeedSupplier() {
+ return this.seedSupplier;
+ }
+
+ public abstract E projectTraverser(final Traverser.Admin<S> traverser);
+
+ public void setReducingBiOperator(final BinaryOperator<E> reducingBiOperator) {
+ this.reducingBiOperator = reducingBiOperator;
+ }
+
+ public BinaryOperator<E> getBiOperator() {
+ return this.reducingBiOperator;
}
public void reset() {
@@ -90,7 +99,7 @@ public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> imple
public void processAllStarts() {
if (this.seed == null) this.seed = this.seedSupplier.get();
while (this.starts.hasNext())
- this.seed = this.reducingBiFunction.apply(this.seed, this.starts.next());
+ this.seed = this.reducingBiOperator.apply(this.seed, this.projectTraverser(this.starts.next()));
}
@Override
@@ -112,11 +121,6 @@ public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> imple
return clone;
}
- @Override
- public MapReduce getMapReduce() {
- return new DefaultMapReduce(this.seedSupplier, this.reducingBiFunction);
- }
-
///////
public static class DefaultMapReduce extends StaticMapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Object> {
@@ -163,7 +167,6 @@ public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> imple
@Override
public Object generateFinalResult(final Iterator keyValues) {
return ((KeyValue) keyValues.next()).getValue();
-
}
@Override