You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/22 22:41:11 UTC

incubator-tinkerpop git commit: Have all the ReducingBarrierSteps no longer implement MapReduce and using MemoryComputeKeys for their reduction. GroupStep, FoldStep, and GroupStepV3d0 are not complete yet. Having the darndest time with GroupStep -- once

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/TINKERPOP-1166 b03c1adc4 -> 65bbdaa33


Have all the ReducingBarrierSteps no longer implement MapReduce and using MemoryComputeKeys for their reduction. GroupStep, FoldStep, and GroupStepV3d0 are not complete yet. Having the darndest time with GroupStep -- once I get it, then the others will follow from it. Pushing to save work.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/65bbdaa3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/65bbdaa3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/65bbdaa3

Branch: refs/heads/TINKERPOP-1166
Commit: 65bbdaa336b4034421f7e2599bb3f2c307aa4773
Parents: b03c1ad
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Feb 22 14:41:02 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Feb 22 14:41:02 2016 -0700

----------------------------------------------------------------------
 .../traversal/TraversalVertexProgram.java       |  31 ++++--
 .../computer/traversal/TraverserExecutor.java   |  28 +++--
 .../traversal/step/map/CountGlobalStep.java     | 101 ++----------------
 .../process/traversal/step/map/FoldStep.java    |  41 ++++----
 .../traversal/step/map/GroupCountStep.java      |  94 +++--------------
 .../process/traversal/step/map/GroupStep.java   |  11 +-
 .../traversal/step/map/GroupStepV3d0.java       |   9 +-
 .../traversal/step/map/MaxGlobalStep.java       | 103 +++----------------
 .../traversal/step/map/MeanGlobalStep.java      | 102 ++----------------
 .../traversal/step/map/MinGlobalStep.java       | 100 +++---------------
 .../traversal/step/map/SumGlobalStep.java       |  98 ++----------------
 .../process/traversal/step/map/TreeStep.java    |  97 ++++-------------
 .../step/util/ReducingBarrierStep.java          |  29 +++---
 13 files changed, 176 insertions(+), 668 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
index ad71f8a..d5059a7 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraversalVertexProgram.java
@@ -37,6 +37,7 @@ import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.map.GraphStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.sideEffect.SideEffectCapStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.FinalGet;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.PureTraversal;
@@ -80,7 +81,8 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
 
     // TODO: if not an adjacent traversal, use Local message scope -- a dual messaging system.
     private static final Set<MessageScope> MESSAGE_SCOPES = new HashSet<>(Collections.singletonList(MessageScope.Global.instance()));
-    private static final Set<VertexComputeKey> ELEMENT_COMPUTE_KEYS = new HashSet<>(Arrays.asList(
+    private Set<MemoryComputeKey> memoryComputeKeys = new HashSet<>();
+    private static final Set<VertexComputeKey> VERTEX_COMPUTE_KEYS = new HashSet<>(Arrays.asList(
             VertexComputeKey.of(HALTED_TRAVERSERS, false),
             VertexComputeKey.of(TraversalSideEffects.SIDE_EFFECTS, false)));
 
@@ -110,11 +112,18 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
         if (!this.traversal.get().isLocked())
             this.traversal.get().applyStrategies();
         this.traversalMatrix = new TraversalMatrix<>(this.traversal.get());
+        this.memoryComputeKeys.add(MemoryComputeKey.of(VOTE_TO_HALT, MemoryComputeKey.andOperator(), true));
         for (final MapReducer<?, ?, ?, ?, ?> mapReducer : TraversalHelper.getStepsOfAssignableClassRecursively(MapReducer.class, this.traversal.get())) {
             this.mapReducers.add(mapReducer.getMapReduce());
+            this.memoryComputeKeys.add(MemoryComputeKey.of(mapReducer.getMapReduce().getMemoryKey(), MemoryComputeKey.setOperator(), false));
         }
-        if (!(this.traversal.get().getEndStep() instanceof SideEffectCapStep) && !(this.traversal.get().getEndStep() instanceof ReducingBarrierStep))
+        if (!(this.traversal.get().getEndStep() instanceof SideEffectCapStep) && !(this.traversal.get().getEndStep() instanceof ReducingBarrierStep)) {
             this.mapReducers.add(new TraverserMapReduce(this.traversal.get()));
+            this.memoryComputeKeys.add(MemoryComputeKey.of(TraverserMapReduce.TRAVERSERS, MemoryComputeKey.setOperator(), false));
+        }
+        for (final ReducingBarrierStep<?, ?> reducingBarrierStep : TraversalHelper.getStepsOfAssignableClassRecursively(ReducingBarrierStep.class, this.traversal.get())) {
+            this.memoryComputeKeys.add(MemoryComputeKey.of(ReducingBarrierStep.REDUCING, reducingBarrierStep.getBiOperator(), false));
+        }
     }
 
     @Override
@@ -126,6 +135,9 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     @Override
     public void setup(final Memory memory) {
         memory.set(VOTE_TO_HALT, true);
+        for (final ReducingBarrierStep<?, ?> reducingBarrierStep : TraversalHelper.getStepsOfAssignableClassRecursively(ReducingBarrierStep.class, this.traversal.get())) {
+            memory.set(ReducingBarrierStep.REDUCING, reducingBarrierStep.getSeedSupplier().get());
+        }
     }
 
     @Override
@@ -162,9 +174,9 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
                         aliveTraverses.add((Traverser.Admin) traverser);
                 });
             }
-            memory.add(VOTE_TO_HALT, aliveTraverses.isEmpty() || TraverserExecutor.execute(vertex, new SingleMessenger<>(messenger, aliveTraverses), this.traversalMatrix));
+            memory.add(VOTE_TO_HALT, aliveTraverses.isEmpty() || TraverserExecutor.execute(vertex, new SingleMessenger<>(messenger, aliveTraverses), this.traversalMatrix, memory));
         } else {  // ITERATION 1+
-            memory.add(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix));
+            memory.add(VOTE_TO_HALT, TraverserExecutor.execute(vertex, messenger, this.traversalMatrix, memory));
         }
     }
 
@@ -172,6 +184,8 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
     public boolean terminate(final Memory memory) {
         final boolean voteToHalt = memory.<Boolean>get(VOTE_TO_HALT);
         if (voteToHalt) {
+            if (memory.exists(ReducingBarrierStep.REDUCING))
+                memory.set(ReducingBarrierStep.REDUCING, FinalGet.tryFinalGet(memory.get(ReducingBarrierStep.REDUCING)));
             return true;
         } else {
             memory.set(VOTE_TO_HALT, true);
@@ -181,17 +195,12 @@ public final class TraversalVertexProgram implements VertexProgram<TraverserSet<
 
     @Override
     public Set<VertexComputeKey> getVertexComputeKeys() {
-        return ELEMENT_COMPUTE_KEYS;
+        return VERTEX_COMPUTE_KEYS;
     }
 
     @Override
     public Set<MemoryComputeKey> getMemoryComputeKeys() {
-        final Set<MemoryComputeKey> memoryComputeKeys = new HashSet<>();
-        memoryComputeKeys.add(MemoryComputeKey.of(VOTE_TO_HALT, MemoryComputeKey.andOperator(), true));
-        for (final MapReduce mapReduce : this.mapReducers) {
-            memoryComputeKeys.add(MemoryComputeKey.of(mapReduce.getMemoryKey(), MemoryComputeKey.setOperator(), false));
-        }
-        return memoryComputeKeys;
+        return this.memoryComputeKeys;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
index 078e880..4660183 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/TraverserExecutor.java
@@ -18,12 +18,14 @@
  */
 package org.apache.tinkerpop.gremlin.process.computer.traversal;
 
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
 import org.apache.tinkerpop.gremlin.process.computer.MessageScope;
 import org.apache.tinkerpop.gremlin.process.computer.Messenger;
 import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.TraversalSideEffects;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.EmptyStep;
+import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalMatrix;
 import org.apache.tinkerpop.gremlin.structure.Edge;
@@ -40,7 +42,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
  */
 public final class TraverserExecutor {
 
-    public static boolean execute(final Vertex vertex, final Messenger<TraverserSet<?>> messenger, final TraversalMatrix<?, ?> traversalMatrix) {
+    public static boolean execute(final Vertex vertex, final Messenger<TraverserSet<?>> messenger, final TraversalMatrix<?, ?> traversalMatrix, final Memory memory) {
 
         final TraverserSet<Object> haltedTraversers = vertex.value(TraversalVertexProgram.HALTED_TRAVERSERS);
         final AtomicBoolean voteToHalt = new AtomicBoolean(true);
@@ -69,11 +71,11 @@ public final class TraverserExecutor {
                 traversers.remove();
                 final Step<?, ?> currentStep = traversalMatrix.getStepById(traverser.getStepId());
                 if (!currentStep.getId().equals(previousStep.getId()))
-                    TraverserExecutor.drainStep(previousStep, aliveTraversers, haltedTraversers);
+                    TraverserExecutor.drainStep(previousStep, aliveTraversers, haltedTraversers, memory);
                 currentStep.addStart((Traverser.Admin) traverser);
                 previousStep = currentStep;
             }
-            TraverserExecutor.drainStep(previousStep, aliveTraversers, haltedTraversers);
+            TraverserExecutor.drainStep(previousStep, aliveTraversers, haltedTraversers, memory);
             assert toProcessTraversers.isEmpty();
             // process all the local objects and send messages or store locally again
             if (!aliveTraversers.isEmpty()) {
@@ -102,14 +104,18 @@ public final class TraverserExecutor {
         return voteToHalt.get();
     }
 
-    private static void drainStep(final Step<?, ?> step, final TraverserSet<?> aliveTraversers, final TraverserSet<?> haltedTraversers) {
-        step.forEachRemaining(traverser -> {
-            if (traverser.asAdmin().isHalted()) {
-                traverser.asAdmin().detach();
-                haltedTraversers.add((Traverser.Admin) traverser);
-            } else
-                aliveTraversers.add((Traverser.Admin) traverser);
-        });
+    private static void drainStep(final Step<?, ?> step, final TraverserSet<?> aliveTraversers, final TraverserSet<?> haltedTraversers, final Memory memory) {
+        if (step instanceof ReducingBarrierStep) {
+            memory.add(ReducingBarrierStep.REDUCING, step.next().get());
+        } else {
+            step.forEachRemaining(traverser -> {
+                if (traverser.asAdmin().isHalted()) {
+                    traverser.asAdmin().detach();
+                    haltedTraversers.add((Traverser.Admin) traverser);
+                } else
+                    aliveTraversers.add((Traverser.Admin) traverser);
+            });
+        }
     }
 
     private static Vertex getHostingVertex(final Object object) {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
index 096580d..4d72146 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/CountGlobalStep.java
@@ -18,128 +18,49 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.util.function.ConstantSupplier;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.EnumSet;
-import java.util.Iterator;
 import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class CountGlobalStep<S> extends ReducingBarrierStep<S, Long> implements MapReducer {
+public final class CountGlobalStep<S> extends ReducingBarrierStep<S, Long> {
 
     private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(TraverserRequirement.BULK);
 
     public CountGlobalStep(final Traversal.Admin traversal) {
         super(traversal);
         this.setSeedSupplier(new ConstantSupplier<>(0L));
-        this.setBiFunction(CountBiFunction.<S>instance());
+        this.setReducingBiOperator(new CountBiOperator());
     }
 
-
     @Override
-    public Set<TraverserRequirement> getRequirements() {
-        return REQUIREMENTS;
+    public Long projectTraverser(final Traverser.Admin<S> traverser) {
+        return traverser.bulk();
     }
 
+
     @Override
-    public MapReduce<MapReduce.NullObject, Long, MapReduce.NullObject, Long, Long> getMapReduce() {
-        return CountGlobalMapReduce.instance();
+    public Set<TraverserRequirement> getRequirements() {
+        return REQUIREMENTS;
     }
 
     ///////////
 
-    private static class CountBiFunction<S> implements BiFunction<Long, Traverser<S>, Long>, Serializable {
-
-        private static final CountBiFunction INSTANCE = new CountBiFunction();
-
-        private CountBiFunction() {
-
-        }
+    private static class CountBiOperator implements BinaryOperator<Long>, Serializable {
 
         @Override
-        public Long apply(final Long mutatingSeed, final Traverser<S> traverser) {
-            return mutatingSeed + traverser.bulk();
-        }
-
-        public final static <S> CountBiFunction<S> instance() {
-            return INSTANCE;
+        public Long apply(final Long mutatingSeed, final Long count) {
+            return mutatingSeed + count;
         }
     }
 
-    ///////////
-
-    private static class CountGlobalMapReduce extends StaticMapReduce<MapReduce.NullObject, Long, MapReduce.NullObject, Long, Long> {
-
-        private static CountGlobalMapReduce INSTANCE = new CountGlobalMapReduce();
-
-        private CountGlobalMapReduce() {
-
-        }
-
-        @Override
-        public boolean doStage(final MapReduce.Stage stage) {
-            return true;
-        }
-
-        @Override
-        public void map(final Vertex vertex, final MapEmitter<NullObject, Long> emitter) {
-            final Iterator<Long> values = IteratorUtils.map(vertex.<Set<Traverser.Admin<Long>>>property(TraversalVertexProgram.HALTED_TRAVERSERS).orElse(Collections.emptySet()).iterator(),
-                    traverser -> traverser.get() * traverser.bulk());
-            long count = getCount(values);
-            if (count > 0)
-                emitter.emit(count);
-        }
-
-        @Override
-        public void combine(final NullObject key, final Iterator<Long> values, final ReduceEmitter<NullObject, Long> emitter) {
-            this.reduce(key, values, emitter);
-        }
-
-        @Override
-        public void reduce(final NullObject key, final Iterator<Long> values, final ReduceEmitter<NullObject, Long> emitter) {
-            long count = getCount(values);
-            if (count > 0)
-                emitter.emit(count);
-        }
-
-        private Long getCount(final Iterator<Long> longs) {
-            long count = 0l;
-            while (longs.hasNext()) {
-                count = count + longs.next();
-            }
-            return count;
-        }
-
-        @Override
-        public String getMemoryKey() {
-            return REDUCING;
-        }
-
-        @Override
-        public Long generateFinalResult(final Iterator<KeyValue<NullObject, Long>> keyValues) {
-            return keyValues.hasNext() ? keyValues.next().getValue() : 0L;
-        }
-
-        public static final CountGlobalMapReduce instance() {
-            return INSTANCE;
-        }
-
-    }
-
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
index c61f89f..ca45767 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/FoldStep.java
@@ -25,10 +25,12 @@ import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequire
 import org.apache.tinkerpop.gremlin.util.function.ArrayListSupplier;
 
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.Collections;
 import java.util.EnumSet;
+import java.util.List;
 import java.util.Set;
 import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 import java.util.function.Supplier;
 
 /**
@@ -39,13 +41,18 @@ public final class FoldStep<S, E> extends ReducingBarrierStep<S, E> {
     private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(TraverserRequirement.OBJECT);
 
     public FoldStep(final Traversal.Admin traversal) {
-        this(traversal, (Supplier) ArrayListSupplier.instance(), (BiFunction) ArrayListBiFunction.instance());
+        this(traversal, (Supplier) ArrayListSupplier.instance(), (BiFunction) new ListBiOperator<>());
+    }
+
+    @Override
+    public E projectTraverser(final Traverser.Admin<S> traverser) {
+        return (E) Collections.singletonList(traverser.get());
     }
 
     public FoldStep(final Traversal.Admin traversal, final Supplier<E> seed, final BiFunction<E, S, E> foldFunction) {
         super(traversal);
         this.setSeedSupplier(seed);
-        this.setBiFunction(new FoldBiFunction<>(foldFunction));
+        this.setReducingBiOperator(new FoldBiOperator<>(foldFunction));
     }
 
     @Override
@@ -55,41 +62,29 @@ public final class FoldStep<S, E> extends ReducingBarrierStep<S, E> {
 
     /////////
 
-    private static class ArrayListBiFunction<S> implements BiFunction<ArrayList<S>, S, ArrayList<S>>, Serializable {
-
-        private static final ArrayListBiFunction INSTANCE = new ArrayListBiFunction();
-
-        private ArrayListBiFunction() {
-
-        }
+    private static class ListBiOperator<S> implements BinaryOperator<List<S>>, Serializable {
 
         @Override
-        public ArrayList<S> apply(final ArrayList<S> mutatingSeed, final S traverser) {
-            mutatingSeed.add(traverser);
+        public List<S> apply(final List<S> mutatingSeed, final List<S> list) {
+            mutatingSeed.addAll(list);
             return mutatingSeed;
         }
 
-        public final static <S> ArrayListBiFunction<S> instance() {
-            return INSTANCE;
-        }
     }
 
     ///////
 
-    public static class FoldBiFunction<S, E> implements BiFunction<E, Traverser<S>, E>, Serializable {
+    public static class FoldBiOperator<E> implements BinaryOperator<E>, Serializable {
 
-        private final BiFunction<E, S, E> biFunction;
+        private final BiFunction biFunction;
 
-        public FoldBiFunction(final BiFunction<E, S, E> biFunction) {
+        public FoldBiOperator(final BiFunction biFunction) {
             this.biFunction = biFunction;
         }
 
         @Override
-        public E apply(E seed, final Traverser<S> traverser) {
-            for (int i = 0; i < traverser.bulk(); i++) {
-                seed = this.biFunction.apply(seed, traverser.get());
-            }
-            return seed;
+        public E apply(E seed, E other) {
+            return (E) this.biFunction.apply(seed, other);
         }
 
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
index 6c7762c..e669bca 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupCountStep.java
@@ -18,44 +18,36 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.MapHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.util.function.HashMapSupplier;
 
 import java.io.Serializable;
 import java.util.Collections;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Long>> implements MapReducer, TraversalParent, ByModulating {
+public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Long>> implements TraversalParent, ByModulating {
 
     private Traversal.Admin<S, E> groupTraversal = null;
 
     public GroupCountStep(final Traversal.Admin traversal) {
         super(traversal);
         this.setSeedSupplier(HashMapSupplier.instance());
-        this.setBiFunction(new GroupCountBiFunction(this));
+        this.setReducingBiOperator(new GroupCountBiOperator());
     }
 
 
@@ -69,14 +61,15 @@ public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Lo
         return null == this.groupTraversal ? Collections.emptyList() : Collections.singletonList(this.groupTraversal);
     }
 
-    @Override
-    public Set<TraverserRequirement> getRequirements() {
-        return this.getSelfAndChildRequirements(TraverserRequirement.BULK);
+    public Map<E, Long> projectTraverser(final Traverser.Admin<S> traverser) {
+        final Map<E, Long> map = new HashMap<>(); // TODO: make singleton map
+        map.put(TraversalUtil.applyNullable(traverser, this.groupTraversal), traverser.bulk());
+        return map;
     }
 
     @Override
-    public MapReduce<E, Long, E, Long, Map<E, Long>> getMapReduce() {
-        return GroupCountMapReduce.instance();
+    public Set<TraverserRequirement> getRequirements() {
+        return this.getSelfAndChildRequirements(TraverserRequirement.BULK);
     }
 
     @Override
@@ -84,7 +77,6 @@ public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Lo
         final GroupCountStep<S, E> clone = (GroupCountStep<S, E>) super.clone();
         if (null != this.groupTraversal)
             clone.groupTraversal = clone.integrateChild(this.groupTraversal.clone());
-        clone.setBiFunction(new GroupCountBiFunction<>(clone));
         return clone;
     }
 
@@ -104,74 +96,12 @@ public final class GroupCountStep<S, E> extends ReducingBarrierStep<S, Map<E, Lo
 
     ///////////
 
-    private static class GroupCountBiFunction<S, E> implements BiFunction<Map<E, Long>, Traverser<S>, Map<E, Long>>, Serializable {
-
-        private final GroupCountStep<S, E> groupCountStep;
-
-        private GroupCountBiFunction(final GroupCountStep<S, E> groupCountStep) {
-            this.groupCountStep = groupCountStep;
-
-        }
+    private static class GroupCountBiOperator<E> implements BinaryOperator<Map<E, Long>>, Serializable {
 
         @Override
-        public Map<E, Long> apply(final Map<E, Long> mutatingSeed, final Traverser<S> traverser) {
-            MapHelper.incr(mutatingSeed, TraversalUtil.applyNullable(traverser.asAdmin(), this.groupCountStep.groupTraversal), traverser.bulk());
+        public Map<E, Long> apply(final Map<E, Long> mutatingSeed, final Map<E, Long> map) {
+            map.forEach((k, v) -> MapHelper.incr(mutatingSeed, k, v));
             return mutatingSeed;
         }
     }
-
-    ///////////
-
-    public static final class GroupCountMapReduce<E> extends StaticMapReduce<E, Long, E, Long, Map<E, Long>> {
-
-        private static final GroupCountMapReduce INSTANCE = new GroupCountMapReduce();
-
-        private GroupCountMapReduce() {
-
-        }
-
-        @Override
-        public boolean doStage(final Stage stage) {
-            return true;
-        }
-
-        @Override
-        public void map(final Vertex vertex, final MapEmitter<E, Long> emitter) {
-            final Map<E, Long> groupCount = new HashMap<>();
-            vertex.<TraverserSet<Map<E, Long>>>property(TraversalVertexProgram.HALTED_TRAVERSERS).ifPresent(traverserSet ->
-                    traverserSet.forEach(traverser ->
-                            traverser.get().forEach((k, v) -> MapHelper.incr(groupCount, k, (v * traverser.bulk())))));
-            groupCount.forEach(emitter::emit);
-        }
-
-        @Override
-        public void reduce(final E key, final Iterator<Long> values, final ReduceEmitter<E, Long> emitter) {
-            long counter = 0;
-            while (values.hasNext()) {
-                counter = counter + values.next();
-            }
-            emitter.emit(key, counter);
-        }
-
-        @Override
-        public void combine(final E key, final Iterator<Long> values, final ReduceEmitter<E, Long> emitter) {
-            reduce(key, values, emitter);
-        }
-
-        @Override
-        public Map<E, Long> generateFinalResult(final Iterator<KeyValue<E, Long>> keyValues) {
-            final Map<E, Long> map = new HashMap<>();
-            keyValues.forEachRemaining(keyValue -> map.put(keyValue.getKey(), keyValue.getValue()));
-            return map;
-        }
-
-        @Override
-        public String getMemoryKey() {
-            return REDUCING;
-        }
-
-        public static final <E> GroupCountMapReduce<E> instance() {
-            return INSTANCE;
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
index de3fd28..a673fea 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStep.java
@@ -70,7 +70,12 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
     public GroupStep(final Traversal.Admin traversal) {
         super(traversal);
         this.setSeedSupplier((Supplier) new GroupStepHelper.GroupMapSupplier());
-        this.setBiFunction(new GroupBiFunction(this));
+        //this.setBiFunction(new GroupBiFunction(this));
+    }
+
+    @Override
+    public Map<K, V> projectTraverser(Traverser.Admin<S> traverser) {
+        return new HashMap<>();
     }
 
     @Override
@@ -90,7 +95,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
     }
 
     @Override
-    public void modulateBy(final Traversal.Admin<?,?> kvTraversal) {
+    public void modulateBy(final Traversal.Admin<?, ?> kvTraversal) {
         if ('k' == this.state) {
             this.keyTraversal = this.integrateChild(kvTraversal);
             this.state = 'v';
@@ -118,7 +123,7 @@ public final class GroupStep<S, K, V> extends ReducingBarrierStep<S, Map<K, V>>
         clone.valueReduceTraversal = clone.integrateChild(this.valueReduceTraversal.clone());
         clone.valueTraversal = clone.integrateChild(this.valueTraversal.clone());
         clone.reduceTraversal = clone.integrateChild(this.reduceTraversal.clone());
-        clone.setBiFunction(new GroupBiFunction<>((GroupStep) clone));
+        //clone.setBiFunction(new GroupBiFunction<>((GroupStep) clone));
         return clone;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
index d28d4e3..a2b7f5e 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/GroupStepV3d0.java
@@ -69,7 +69,12 @@ public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<
     public GroupStepV3d0(final Traversal.Admin traversal) {
         super(traversal);
         this.setSeedSupplier((Supplier) new GroupMapSupplierV3d0());
-        this.setBiFunction(new GroupBiFunction(this));
+       // this.setBiFunction(new GroupBiFunction(this));
+    }
+
+    @Override
+    public Map<K, R> projectTraverser(Traverser.Admin<S> traverser) {
+        return null;
     }
 
     @Override
@@ -123,7 +128,7 @@ public final class GroupStepV3d0<S, K, V, R> extends ReducingBarrierStep<S, Map<
             clone.valueTraversal = clone.integrateChild(this.valueTraversal.clone());
         if (null != this.reduceTraversal)
             clone.reduceTraversal = clone.integrateChild(this.reduceTraversal.clone());
-        clone.setBiFunction(new GroupBiFunction<>((GroupStepV3d0) clone));
+       // clone.setBiFunction(new GroupBiFunction<>((GroupStepV3d0) clone));
         return clone;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
index c552bab..7db28e5 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MaxGlobalStep.java
@@ -18,124 +18,49 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.util.function.ConstantSupplier;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.io.Serializable;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 
 import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.max;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class MaxGlobalStep<S extends Number> extends ReducingBarrierStep<S, S> implements MapReducer {
+public final class MaxGlobalStep<S extends Number> extends ReducingBarrierStep<S, S> {
+
+
+    private static final Double NAN = Double.valueOf(Double.NaN);
 
     public MaxGlobalStep(final Traversal.Admin traversal) {
         super(traversal);
-        this.setSeedSupplier(new ConstantSupplier<>(null));
-        this.setBiFunction(MaxGlobalBiFunction.<S>instance());
+        this.setSeedSupplier(new ConstantSupplier<>((S) NAN));
+        this.setReducingBiOperator(new MaxGlobalBiOperator<>());
     }
 
     @Override
-    public Set<TraverserRequirement> getRequirements() {
-        return Collections.singleton(TraverserRequirement.OBJECT);
+    public S projectTraverser(final Traverser.Admin<S> traverser) {
+        return traverser.get();
     }
 
     @Override
-    public MapReduce<MapReduce.NullObject, Number, MapReduce.NullObject, Number, Number> getMapReduce() {
-        return MaxGlobalMapReduce.instance();
+    public Set<TraverserRequirement> getRequirements() {
+        return Collections.singleton(TraverserRequirement.OBJECT);
     }
 
     /////
 
-    private static class MaxGlobalBiFunction<S extends Number> implements BiFunction<S, Traverser<S>, S>, Serializable {
-
-        private static final MaxGlobalBiFunction INSTANCE = new MaxGlobalBiFunction();
-
-        private MaxGlobalBiFunction() {
-
-        }
-
+    private static class MaxGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
         @Override
-        public S apply(final S mutatingSeed, final Traverser<S> traverser) {
-            final S value = traverser.get();
-            return mutatingSeed != null ? (S) max(mutatingSeed, traverser.get()) : value;
-        }
-
-        public static <S extends Number> MaxGlobalBiFunction<S> instance() {
-            return INSTANCE;
-        }
-    }
-
-    ///////////
-
-    private static class MaxGlobalMapReduce extends StaticMapReduce<MapReduce.NullObject, Number, MapReduce.NullObject, Number, Number> {
-
-        private static final MaxGlobalMapReduce INSTANCE = new MaxGlobalMapReduce();
-
-        private MaxGlobalMapReduce() {
-
-        }
-
-        @Override
-        public boolean doStage(final MapReduce.Stage stage) {
-            return true;
-        }
-
-        @Override
-        public void map(final Vertex vertex, final MapEmitter<NullObject, Number> emitter) {
-            final Iterator<Number> values = IteratorUtils.map(vertex.<Set<Traverser.Admin<Number>>>property(TraversalVertexProgram.HALTED_TRAVERSERS).orElse(Collections.emptySet()).iterator(), Traverser.Admin::get);
-            if (values.hasNext())
-                emitter.emit(getMax(values));
-        }
-
-        @Override
-        public void combine(final NullObject key, final Iterator<Number> values, final ReduceEmitter<NullObject, Number> emitter) {
-            this.reduce(key, values, emitter);
-        }
-
-        @Override
-        public void reduce(final NullObject key, final Iterator<Number> values, final ReduceEmitter<NullObject, Number> emitter) {
-            if (values.hasNext())
-                emitter.emit(getMax(values));
-        }
-
-        private Number getMax(final Iterator<Number> numbers) {
-            Number max = null;
-            while (numbers.hasNext()) {
-                final Number value = numbers.next();
-                max = max != null ? max(value, max) : value;
-            }
-            return max;
-        }
-
-        @Override
-        public String getMemoryKey() {
-            return REDUCING;
-        }
-
-        @Override
-        public Number generateFinalResult(final Iterator<KeyValue<NullObject, Number>> keyValues) {
-            return keyValues.hasNext() ? keyValues.next().getValue() : Double.NaN;
-
-        }
-
-        public static MaxGlobalMapReduce instance() {
-            return INSTANCE;
+        public S apply(final S mutatingSeed, final S number) {
+            return !NAN.equals(mutatingSeed) ? (S) max(mutatingSeed, number) : number;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
index a4e4272..53688cd 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MeanGlobalStep.java
@@ -18,29 +18,20 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
 import org.apache.tinkerpop.gremlin.process.traversal.NumberHelper;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.FinalGet;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.util.function.MeanNumberSupplier;
 
 import java.io.Serializable;
 import java.util.EnumSet;
-import java.util.Iterator;
 import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 import java.util.function.Supplier;
 
-import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.add;
 import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.div;
 import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.mul;
 
@@ -48,109 +39,36 @@ import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.mul;
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  * @author Daniel Kuppitz (http://gremlin.guru)
  */
-public final class MeanGlobalStep<S extends Number, E extends Number> extends ReducingBarrierStep<S, E> implements MapReducer {
+public final class MeanGlobalStep<S extends Number, E extends Number> extends ReducingBarrierStep<S, E> {
 
     private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(TraverserRequirement.OBJECT, TraverserRequirement.BULK);
 
     public MeanGlobalStep(final Traversal.Admin traversal) {
         super(traversal);
         this.setSeedSupplier((Supplier) MeanNumberSupplier.instance());
-        this.setBiFunction((BiFunction) MeanGlobalBiFunction.instance());
+        this.setReducingBiOperator(new MeanGlobalBiOperator<>());
     }
 
     @Override
-    public Set<TraverserRequirement> getRequirements() {
-        return REQUIREMENTS;
+    public E projectTraverser(final Traverser.Admin<S> traverser) {
+        return (E) new MeanNumber(traverser.get(), traverser.bulk());
     }
 
     @Override
-    public MapReduce<Number, Long, Number, Long, Number> getMapReduce() {
-        return MeanGlobalMapReduce.instance();
+    public Set<TraverserRequirement> getRequirements() {
+        return REQUIREMENTS;
     }
 
     /////
 
-    private static class MeanGlobalBiFunction<S extends Number> implements BiFunction<S, Traverser<S>, S>, Serializable {
-
-        private static final MeanGlobalBiFunction INSTANCE = new MeanGlobalBiFunction();
-
-        private MeanGlobalBiFunction() {
-
-        }
-
-        @Override
-        public S apply(final S mutatingSeed, final Traverser<S> traverser) {
-            return (S) ((MeanNumber) mutatingSeed).add(traverser.get(), traverser.bulk());
-        }
-
-        public static <S extends Number> MeanGlobalBiFunction<S> instance() {
-            return INSTANCE;
-        }
-    }
-
-    ///////////
-
-    private static final class MeanGlobalMapReduce extends StaticMapReduce<Number, Long, Number, Long, Number> {
-
-        private static final MeanGlobalMapReduce INSTANCE = new MeanGlobalMapReduce();
-
-        private MeanGlobalMapReduce() {
-
-        }
-
-        @Override
-        public boolean doStage(final MapReduce.Stage stage) {
-            return true;
-        }
-
-        @Override
-        public void map(final Vertex vertex, final MapEmitter<Number, Long> emitter) {
-            vertex.<TraverserSet<Number>>property(TraversalVertexProgram.HALTED_TRAVERSERS).ifPresent(traverserSet -> traverserSet.forEach(traverser -> emitter.emit(traverser.get(), traverser.bulk())));
-        }
-
-        @Override
-        public void combine(final Number key, final Iterator<Long> values, final ReduceEmitter<Number, Long> emitter) {
-            this.reduce(key, values, emitter);
-        }
+    private static class MeanGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
 
         @Override
-        public void reduce(final Number key, final Iterator<Long> values, final ReduceEmitter<Number, Long> emitter) {
-            long counter = 0;
-            while (values.hasNext()) {
-                counter = counter + values.next();
-            }
-            emitter.emit(key, counter);
-        }
-
-        @Override
-        public String getMemoryKey() {
-            return REDUCING;
-        }
-
-        @Override
-        public Number generateFinalResult(final Iterator<KeyValue<Number, Long>> keyValues) {
-            if (keyValues.hasNext()) {
-                KeyValue<Number, Long> pair = keyValues.next();
-                long counter = pair.getValue();
-                Number result = mul(pair.getKey(), counter);
-                while (keyValues.hasNext()) {
-                    long incr = pair.getValue();
-                    pair = keyValues.next();
-                    result = add(result, mul(pair.getKey(), incr));
-                    counter += incr;
-                }
-                return div(result, counter, true);
-            }
-            return Double.NaN;
-        }
-
-        public static MeanGlobalMapReduce instance() {
-            return INSTANCE;
+        public S apply(final S mutatingSeed, final S number) {
+            return (S) (number instanceof MeanNumber ? ((MeanNumber) mutatingSeed).add((MeanNumber) number) : ((MeanNumber) mutatingSeed).add(number, 1l));
         }
     }
 
-    ///
-
     public static final class MeanNumber extends Number implements Comparable<Number>, FinalGet<Number> {
 
         private long count;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
index 88dd46f..a1bc2c4 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/MinGlobalStep.java
@@ -18,122 +18,48 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.util.function.ConstantSupplier;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.io.Serializable;
 import java.util.Collections;
-import java.util.Iterator;
 import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 
 import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.min;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class MinGlobalStep<S extends Number> extends ReducingBarrierStep<S, S> implements MapReducer {
+public final class MinGlobalStep<S extends Number> extends ReducingBarrierStep<S, S> {
+
+    private static final Double NAN = Double.valueOf(Double.NaN);
 
     public MinGlobalStep(final Traversal.Admin traversal) {
         super(traversal);
-        this.setSeedSupplier(new ConstantSupplier<>(null));
-        this.setBiFunction(MinGlobalBiFunction.instance());
+        this.setSeedSupplier(new ConstantSupplier<>((S) NAN));
+        this.setReducingBiOperator(new MinGlobalBiOperator<>());
     }
 
     @Override
-    public Set<TraverserRequirement> getRequirements() {
-        return Collections.singleton(TraverserRequirement.OBJECT);
+    public S projectTraverser(final Traverser.Admin<S> traverser) {
+        return traverser.get();
     }
 
     @Override
-    public MapReduce<MapReduce.NullObject, Number, MapReduce.NullObject, Number, Number> getMapReduce() {
-        return MinGlobalMapReduce.instance();
+    public Set<TraverserRequirement> getRequirements() {
+        return Collections.singleton(TraverserRequirement.OBJECT);
     }
 
     /////
 
-    private static class MinGlobalBiFunction<S extends Number> implements BiFunction<S, Traverser<S>, S>, Serializable {
-
-        private static final MinGlobalBiFunction INSTANCE = new MinGlobalBiFunction();
-
-        private MinGlobalBiFunction() {
-
-        }
-
+    private static class MinGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
         @Override
-        public S apply(final S mutatingSeed, final Traverser<S> traverser) {
-            final S value = traverser.get();
-            return mutatingSeed != null ? (S) min(mutatingSeed, traverser.get()) : value;
-        }
-
-        public static <S extends Number> MinGlobalBiFunction<S> instance() {
-            return INSTANCE;
-        }
-    }
-
-    ///////////
-
-    private static class MinGlobalMapReduce extends StaticMapReduce<MapReduce.NullObject, Number, MapReduce.NullObject, Number, Number> {
-
-        private static final MinGlobalMapReduce INSTANCE = new MinGlobalMapReduce();
-
-        private MinGlobalMapReduce() {
-
-        }
-
-        @Override
-        public boolean doStage(final MapReduce.Stage stage) {
-            return true;
-        }
-
-        @Override
-        public void map(final Vertex vertex, final MapEmitter<NullObject, Number> emitter) {
-            final Iterator<Number> values = IteratorUtils.map(vertex.<Set<Traverser.Admin<Number>>>property(TraversalVertexProgram.HALTED_TRAVERSERS).orElse(Collections.emptySet()).iterator(), Traverser.Admin::get);
-            if (values.hasNext())
-                emitter.emit(getMin(values));
-        }
-
-        @Override
-        public void combine(final NullObject key, final Iterator<Number> values, final ReduceEmitter<NullObject, Number> emitter) {
-            this.reduce(key, values, emitter);
-        }
-
-        @Override
-        public void reduce(final NullObject key, final Iterator<Number> values, final ReduceEmitter<NullObject, Number> emitter) {
-            if (values.hasNext()) emitter.emit(getMin(values));
-        }
-
-        private Number getMin(final Iterator<Number> numbers) {
-            Number min = null;
-            while (numbers.hasNext()) {
-                final Number value = numbers.next();
-                min = min != null ? min(value, min) : value;
-            }
-            return min;
-        }
-
-        @Override
-        public String getMemoryKey() {
-            return REDUCING;
-        }
-
-        @Override
-        public Number generateFinalResult(final Iterator<KeyValue<NullObject, Number>> keyValues) {
-            return keyValues.hasNext() ? keyValues.next().getValue() : Double.NaN;
-        }
-
-        public static MinGlobalMapReduce instance() {
-            return INSTANCE;
+        public S apply(final S mutatingSeed, final S number) {
+            return !NAN.equals(mutatingSeed) ? (S) min(mutatingSeed, number) : number;
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
index 8c13192..bea7717 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/SumGlobalStep.java
@@ -18,25 +18,16 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.util.function.ConstantSupplier;
-import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
 import java.io.Serializable;
-import java.util.Collections;
 import java.util.EnumSet;
-import java.util.Iterator;
 import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 
 import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.add;
 import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.mul;
@@ -44,7 +35,7 @@ import static org.apache.tinkerpop.gremlin.process.traversal.NumberHelper.mul;
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class SumGlobalStep<S extends Number> extends ReducingBarrierStep<S, S> implements MapReducer {
+public final class SumGlobalStep<S extends Number> extends ReducingBarrierStep<S, S> {
 
     private static final Set<TraverserRequirement> REQUIREMENTS = EnumSet.of(
             TraverserRequirement.BULK,
@@ -54,94 +45,27 @@ public final class SumGlobalStep<S extends Number> extends ReducingBarrierStep<S
     public SumGlobalStep(final Traversal.Admin traversal) {
         super(traversal);
         this.setSeedSupplier(new ConstantSupplier<>((S) Integer.valueOf(0)));
-        this.setBiFunction(SumGlobalBiFunction.instance());
+        this.setReducingBiOperator(new SumGlobalBiOperator<S>());
     }
 
-
     @Override
-    public Set<TraverserRequirement> getRequirements() {
-        return REQUIREMENTS;
+    public S projectTraverser(final Traverser.Admin<S> traverser) {
+        return (S) mul(traverser.get(), traverser.bulk());
     }
 
+
     @Override
-    public MapReduce<MapReduce.NullObject, Number, MapReduce.NullObject, Number, Number> getMapReduce() {
-        return SumGlobalMapReduce.instance();
+    public Set<TraverserRequirement> getRequirements() {
+        return REQUIREMENTS;
     }
 
     /////
 
-    private static class SumGlobalBiFunction<S extends Number> implements BiFunction<S, Traverser<S>, S>, Serializable {
-
-        private static final SumGlobalBiFunction INSTANCE = new SumGlobalBiFunction<>();
-
-        private SumGlobalBiFunction() {
-
-        }
+    private static class SumGlobalBiOperator<S extends Number> implements BinaryOperator<S>, Serializable {
 
         @Override
-        public S apply(final S mutatingSeed, final Traverser<S> traverser) {
-            return (S) add(mutatingSeed, mul(traverser.get(), traverser.bulk()));
-        }
-
-        public static <S extends Number> SumGlobalBiFunction<S> instance() {
-            return INSTANCE;
-        }
-    }
-
-    ///////////
-
-    private static class SumGlobalMapReduce extends StaticMapReduce<MapReduce.NullObject, Number, MapReduce.NullObject, Number, Number> {
-
-        private static final SumGlobalMapReduce INSTANCE = new SumGlobalMapReduce();
-
-        private SumGlobalMapReduce() {
-
-        }
-
-        @Override
-        public boolean doStage(final MapReduce.Stage stage) {
-            return true;
-        }
-
-        @Override
-        public void map(final Vertex vertex, final MapEmitter<NullObject, Number> emitter) {
-            final Iterator<Number> values = IteratorUtils.map(vertex.<Set<Traverser.Admin<Number>>>property(TraversalVertexProgram.HALTED_TRAVERSERS).orElse(Collections.emptySet()).iterator(),
-                    traverser -> mul(traverser.get(), traverser.bulk()));
-            if (values.hasNext())
-                emitter.emit(getSum(values));
-        }
-
-        @Override
-        public void combine(final NullObject key, final Iterator<Number> values, final ReduceEmitter<NullObject, Number> emitter) {
-            this.reduce(key, values, emitter);
-        }
-
-        @Override
-        public void reduce(final NullObject key, final Iterator<Number> values, final ReduceEmitter<NullObject, Number> emitter) {
-            if (values.hasNext())
-                emitter.emit(getSum(values));
-        }
-
-        private Number getSum(final Iterator<Number> numbers) {
-            Number sum = numbers.next();
-            while (numbers.hasNext()) {
-                sum = add(sum, numbers.next());
-            }
-            return sum;
-        }
-
-        @Override
-        public String getMemoryKey() {
-            return REDUCING;
-        }
-
-        @Override
-        public Number generateFinalResult(final Iterator<KeyValue<NullObject, Number>> keyValues) {
-            return keyValues.hasNext() ? keyValues.next().getValue() : 0;
-        }
-
-        public static SumGlobalMapReduce instance() {
-            return INSTANCE;
+        public S apply(final S mutatingSeed, final S number) {
+            return (S) add(mutatingSeed, number);
         }
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
index 07d8112..fa41210 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/map/TreeStep.java
@@ -18,45 +18,37 @@
  */
 package org.apache.tinkerpop.gremlin.process.traversal.step.map;
 
-import org.apache.tinkerpop.gremlin.process.computer.KeyValue;
-import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.tinkerpop.gremlin.process.computer.traversal.TraversalVertexProgram;
-import org.apache.tinkerpop.gremlin.process.computer.util.StaticMapReduce;
 import org.apache.tinkerpop.gremlin.process.traversal.Path;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.ByModulating;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.step.PathProcessor;
 import org.apache.tinkerpop.gremlin.process.traversal.step.TraversalParent;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.ReducingBarrierStep;
 import org.apache.tinkerpop.gremlin.process.traversal.step.util.Tree;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.TraverserRequirement;
-import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalRing;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalUtil;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.util.function.TreeSupplier;
 
 import java.io.Serializable;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Set;
-import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 import java.util.function.Supplier;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
-public final class TreeStep<S> extends ReducingBarrierStep<S, Tree> implements MapReducer, TraversalParent, ByModulating, PathProcessor {
+public final class TreeStep<S> extends ReducingBarrierStep<S, Tree> implements TraversalParent, ByModulating, PathProcessor {
 
     private TraversalRing<Object, Object> traversalRing = new TraversalRing<>();
 
     public TreeStep(final Traversal.Admin traversal) {
         super(traversal);
         this.setSeedSupplier((Supplier) TreeSupplier.instance());
-        this.setBiFunction(new TreeBiFunction(this));
+        this.setReducingBiOperator(new TreeBiOperator());
     }
 
 
@@ -76,16 +68,26 @@ public final class TreeStep<S> extends ReducingBarrierStep<S, Tree> implements M
     }
 
     @Override
-    public MapReduce<MapReduce.NullObject, Tree, MapReduce.NullObject, Tree, Tree> getMapReduce() {
-        return TreeMapReduce.instance();
+    public Tree projectTraverser(final Traverser.Admin<S> traverser) {
+        final Tree topTree = new Tree();
+        Tree depth = topTree;
+        final Path path = traverser.path();
+        for (int i = 0; i < path.size(); i++) {
+            final Object object = TraversalUtil.apply(path.<Object>get(i), this.traversalRing.next());
+            if (!depth.containsKey(object))
+                depth.put(object, new Tree<>());
+            depth = (Tree) depth.get(object);
+        }
+        this.traversalRing.reset();
+        return topTree;
     }
 
+
     @Override
     public TreeStep<S> clone() {
         final TreeStep<S> clone = (TreeStep<S>) super.clone();
         clone.traversalRing = this.traversalRing.clone();
         clone.getLocalChildren().forEach(clone::integrateChild);
-        clone.setBiFunction(new TreeBiFunction<>(clone));
         return clone;
     }
 
@@ -107,73 +109,12 @@ public final class TreeStep<S> extends ReducingBarrierStep<S, Tree> implements M
 
     ///////////
 
-    private static class TreeBiFunction<S> implements BiFunction<Tree, Traverser<S>, Tree>, Serializable {
-
-        private final TreeStep<S> treeStep;
-
-        private TreeBiFunction(final TreeStep<S> treeStep) {
-            this.treeStep = treeStep;
-        }
+    private static class TreeBiOperator implements BinaryOperator<Tree>, Serializable {
 
         @Override
-        public Tree apply(final Tree mutatingSeed, final Traverser<S> traverser) {
-            Tree depth = mutatingSeed;
-            final Path path = traverser.path();
-            for (int i = 0; i < path.size(); i++) {
-                final Object object = TraversalUtil.apply(path.<Object>get(i), this.treeStep.traversalRing.next());
-                if (!depth.containsKey(object))
-                    depth.put(object, new Tree<>());
-                depth = (Tree) depth.get(object);
-            }
-            this.treeStep.traversalRing.reset();
+        public Tree apply(final Tree mutatingSeed, final Tree tree) {
+            mutatingSeed.addTree(tree);
             return mutatingSeed;
         }
     }
-
-    ///////////
-
-    public static final class TreeMapReduce extends StaticMapReduce<MapReduce.NullObject, Tree, MapReduce.NullObject, Tree, Tree> {
-
-        private static final TreeMapReduce INSTANCE = new TreeMapReduce();
-
-        private TreeMapReduce() {
-
-        }
-
-        @Override
-        public boolean doStage(final Stage stage) {
-            return true;
-        }
-
-        @Override
-        public void map(final Vertex vertex, final MapEmitter<NullObject, Tree> emitter) {
-            vertex.<TraverserSet<Tree>>property(TraversalVertexProgram.HALTED_TRAVERSERS).ifPresent(traverserSet -> traverserSet.forEach(traverser -> emitter.emit(traverser.get())));
-        }
-
-        @Override
-        public void combine(final NullObject key, final Iterator<Tree> values, final ReduceEmitter<NullObject, Tree> emitter) {
-            this.reduce(key, values, emitter);
-        }
-
-        @Override
-        public void reduce(final NullObject key, final Iterator<Tree> values, final ReduceEmitter<NullObject, Tree> emitter) {
-            final Tree tree = new Tree();
-            values.forEachRemaining(tree::addTree);
-            emitter.emit(tree);
-        }
-
-        @Override
-        public Tree generateFinalResult(final Iterator<KeyValue<NullObject, Tree>> keyValues) {
-            return keyValues.hasNext() ? keyValues.next().getValue() : new Tree();
-        }
-
-        @Override
-        public String getMemoryKey() {
-            return REDUCING;
-        }
-
-        public static final TreeMapReduce instance() {
-            return INSTANCE;
-        }
-    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/65bbdaa3/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java
index ea94499..921385c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/step/util/ReducingBarrierStep.java
@@ -28,7 +28,6 @@ import org.apache.tinkerpop.gremlin.process.traversal.Step;
 import org.apache.tinkerpop.gremlin.process.traversal.Traversal;
 import org.apache.tinkerpop.gremlin.process.traversal.Traverser;
 import org.apache.tinkerpop.gremlin.process.traversal.step.Barrier;
-import org.apache.tinkerpop.gremlin.process.traversal.step.MapReducer;
 import org.apache.tinkerpop.gremlin.process.traversal.traverser.util.TraverserSet;
 import org.apache.tinkerpop.gremlin.process.traversal.util.FastNoSuchElementException;
 import org.apache.tinkerpop.gremlin.process.traversal.util.TraversalHelper;
@@ -37,21 +36,21 @@ import org.apache.tinkerpop.gremlin.structure.Vertex;
 
 import java.util.Iterator;
 import java.util.function.BiFunction;
+import java.util.function.BinaryOperator;
 import java.util.function.Supplier;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
  */
 
-public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> implements MapReducer, Barrier {
+public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> implements Barrier {
 
 
     public static final String REDUCING = Graph.Hidden.hide("reducing");
 
     protected Supplier<E> seedSupplier;
-    protected BiFunction<E, Traverser<S>, E> reducingBiFunction;
+    protected BinaryOperator<E> reducingBiOperator;
     private boolean done = false;
-
     private E seed = null;
 
     public ReducingBarrierStep(final Traversal.Admin traversal) {
@@ -62,8 +61,18 @@ public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> imple
         this.seedSupplier = seedSupplier;
     }
 
-    public void setBiFunction(final BiFunction<E, Traverser<S>, E> reducingBiFunction) {
-        this.reducingBiFunction = reducingBiFunction;
+    public Supplier<E> getSeedSupplier() {
+        return this.seedSupplier;
+    }
+
+    public abstract E projectTraverser(final Traverser.Admin<S> traverser);
+
+    public void setReducingBiOperator(final BinaryOperator<E> reducingBiOperator) {
+        this.reducingBiOperator = reducingBiOperator;
+    }
+
+    public BinaryOperator<E> getBiOperator() {
+        return this.reducingBiOperator;
     }
 
     public void reset() {
@@ -90,7 +99,7 @@ public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> imple
     public void processAllStarts() {
         if (this.seed == null) this.seed = this.seedSupplier.get();
         while (this.starts.hasNext())
-            this.seed = this.reducingBiFunction.apply(this.seed, this.starts.next());
+            this.seed = this.reducingBiOperator.apply(this.seed, this.projectTraverser(this.starts.next()));
     }
 
     @Override
@@ -112,11 +121,6 @@ public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> imple
         return clone;
     }
 
-    @Override
-    public MapReduce getMapReduce() {
-        return new DefaultMapReduce(this.seedSupplier, this.reducingBiFunction);
-    }
-
     ///////
 
     public static class DefaultMapReduce extends StaticMapReduce<MapReduce.NullObject, Object, MapReduce.NullObject, Object, Object> {
@@ -163,7 +167,6 @@ public abstract class ReducingBarrierStep<S, E> extends AbstractStep<S, E> imple
         @Override
         public Object generateFinalResult(final Iterator keyValues) {
             return ((KeyValue) keyValues.next()).getValue();
-
         }
 
         @Override