You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/02/13 16:35:06 UTC

[12/17] incubator-tinkerpop git commit: Found a bug in TraverserMapReduce where the combiner should not just call reduce(). Only Giraph was able to expose it because it actually calls combine(). The test dataset is so small that Spark doesn't even kick i

Found a bug in TraverserMapReduce where the combiner should not just call reduce(). Only Giraph was able to expose it because it actually calls combine(). The test dataset is so small that Spark doesn't even kick it off. TinkerGraph, because its in memory, doesn't use combine. Wow. That was a two day bug. However, I have written so many more test cases. Things are really pretty. And that ends my week with g.V.pageRank().order().by().limit(10) working. Phew.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/0a224303
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/0a224303
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/0a224303

Branch: refs/heads/master
Commit: 0a2243037792854c6ef9f834f41785e7a728ee7e
Parents: cd1a379
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Fri Feb 12 14:02:46 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Fri Feb 12 14:02:46 2016 -0700

----------------------------------------------------------------------
 .../gremlin/process/computer/GraphComputer.java |   4 +
 .../traversal/step/map/ComputerResultStep.java  |   7 ++
 .../mapreduce/TraverserMapReduce.java           |  23 +++-
 .../process/computer/GraphComputerTest.java     |  35 +++++-
 .../computer/AbstractHadoopGraphComputer.java   |   5 +
 .../process/computer/util/MapReduceHelper.java  | 122 +++++++++----------
 .../process/computer/TinkerGraphComputer.java   |  63 +++++-----
 7 files changed, 160 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0a224303/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
index ddb09a0..62fc260 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputer.java
@@ -272,6 +272,10 @@ public interface GraphComputer {
         public static IllegalArgumentException edgeFilterAccessesAdjacentVertices(final Traversal<Vertex, Edge> edgeFilter) {
             return new IllegalArgumentException("The provided edge filter traversal accesses data on adjacent vertices: " + edgeFilter);
         }
+
+        public static IllegalStateException mapReduceJobsMustHaveAMapStage(final MapReduce mapReduce) {
+            throw new IllegalStateException("MapReduce jobs must have a map stage: " + mapReduce);
+        }
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0a224303/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java
index bf5586b..86dce91 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/map/ComputerResultStep.java
@@ -107,4 +107,11 @@ public final class ComputerResultStep<S> extends AbstractStep<ComputerResult, S>
     public Set<TraverserRequirement> getRequirements() {
         return EnumSet.of(TraverserRequirement.OBJECT);
     }
+
+    @Override
+    public ComputerResultStep<S> clone() {
+        final ComputerResultStep<S> clone = (ComputerResultStep<S>)super.clone();
+        clone.currentIterator = EmptyIterator.instance();
+        return clone;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0a224303/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java
index 07294f9..e68cd3f 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/traversal/step/sideEffect/mapreduce/TraverserMapReduce.java
@@ -38,6 +38,7 @@ import org.apache.tinkerpop.gremlin.structure.Edge;
 import org.apache.tinkerpop.gremlin.structure.Graph;
 import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.tinkerpop.gremlin.structure.util.Attachable;
+import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
 import org.apache.tinkerpop.gremlin.util.function.ChainedComparator;
 import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 
@@ -119,9 +120,11 @@ public final class TraverserMapReduce implements MapReduce<Comparable, Traverser
 
     @Override
     public void map(final Vertex vertex, final MapEmitter<Comparable, Traverser<?>> emitter) {
-        vertex.<TraverserSet<Object>>property(TraversalVertexProgram.HALTED_TRAVERSERS).ifPresent(traverserSet -> IteratorUtils.removeOnNext(traverserSet.iterator()).forEachRemaining(traverser -> {
-            if (this.attachHaltedTraverser && !(traverser.get() instanceof Edge))
+        vertex.<TraverserSet<Object>>property(TraversalVertexProgram.HALTED_TRAVERSERS).ifPresent(traverserSet -> traverserSet.forEach(traverser -> {
+            if (this.attachHaltedTraverser && !(traverser.get() instanceof Edge)) {
+                traverser = traverser.clone().asAdmin();
                 traverser.attach(Attachable.Method.get(vertex));
+            }
             if (null != this.comparator)    // TODO: I think we shouldn't ever single key it  -- always double emit to load balance the servers.
                 emitter.emit(traverser, traverser);
             else
@@ -136,7 +139,16 @@ public final class TraverserMapReduce implements MapReduce<Comparable, Traverser
 
     @Override
     public void combine(final Comparable comparable, final Iterator<Traverser<?>> values, final ReduceEmitter<Comparable, Traverser<?>> emitter) {
-        this.reduce(comparable, values, emitter);
+        final TraverserSet<?> traverserSet = new TraverserSet<>();
+        while (values.hasNext()) {
+            traverserSet.add((Traverser.Admin) values.next().asAdmin());
+        }
+        IteratorUtils.removeOnNext(traverserSet.iterator()).forEachRemaining(traverser -> {
+            if (null != this.comparator)
+                emitter.emit(traverser, traverser);
+            else
+                emitter.emit(traverser);
+        });
     }
 
     @Override
@@ -181,4 +193,9 @@ public final class TraverserMapReduce implements MapReduce<Comparable, Traverser
     public String getMemoryKey() {
         return TRAVERSERS;
     }
+
+    @Override
+    public String toString() {
+        return StringFactory.mapReduceString(this, this.traversal.toString());
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0a224303/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
----------------------------------------------------------------------
diff --git a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
index 23ef0a7..c910744 100644
--- a/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
+++ b/gremlin-test/src/main/java/org/apache/tinkerpop/gremlin/process/computer/GraphComputerTest.java
@@ -77,7 +77,8 @@ import static org.junit.Assert.fail;
         "vertexPropertiesCanNotBeUpdatedInMapReduce",
         "computerRequiresMoreWorkersThanSupported",
         "vertexFilterAccessesIncidentEdges",
-        "edgeFilterAccessesAdjacentVertices"
+        "edgeFilterAccessesAdjacentVertices",
+        "mapReduceJobsMustHaveAMapStage"
 })
 @ExceptionCoverage(exceptionClass = Graph.Exceptions.class, methods = {
         "graphDoesNotSupportProvidedGraphComputer"
@@ -103,6 +104,35 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         }
     }
 
+    @Test
+    @LoadGraphWith(MODERN)
+    public void shouldNotAllowMapReduceJobsWithoutAMapStage() throws Exception {
+        try {
+            graphProvider.getGraphComputer(graph).mapReduce(new BadMapReduce()).submit().get();
+            fail("Should throw an IllegalStateException saying that MapReduce jobs must have a map stage");
+        } catch (Exception e) {
+            assertTrue(true);
+        }
+    }
+
+    public static class BadMapReduce extends StaticMapReduce {
+
+        @Override
+        public boolean doStage(final Stage stage) {
+            return false;
+        }
+
+        @Override
+        public String getMemoryKey() {
+            return "nothing";
+        }
+
+        @Override
+        public Object generateFinalResult(final Iterator keyValues) {
+            return new Object();
+        }
+    }
+
     /////////////////////////////////////////////
     @Test
     @LoadGraphWith(MODERN)
@@ -1859,8 +1889,9 @@ public class GraphComputerTest extends AbstractGremlinProcessTest {
         assertEquals(6, graph2.traversal().V().values(PageRankVertexProgram.PAGE_RANK).count().next().intValue());
         assertEquals(6, graph2.traversal().V().values(PageRankVertexProgram.EDGE_COUNT).count().next().intValue());
         //
+        // TODO: Need to solve the chicken and the egg problem with how TraversalVertexPrograms and strategies play on each other.
         final ComputerResult result3 = graph2.compute(graphProvider.getGraphComputer(graph2).getClass())
-                .program(TraversalVertexProgram.build().traversal(graph2.traversal().V().groupCount("m").by(__.values(PageRankVertexProgram.PAGE_RANK).count()).label().asAdmin()).create(graph2)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get();
+                .program(TraversalVertexProgram.build().traversal(__.V().groupCount("m").by(__.values(PageRankVertexProgram.PAGE_RANK).count()).label().asAdmin()).create(graph2)).persist(GraphComputer.Persist.EDGES).result(GraphComputer.ResultGraph.NEW).submit().get();
         final Graph graph3 = result3.graph();
         final Memory memory3 = result3.memory();
         assertTrue(memory3.keys().contains("m"));

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0a224303/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
index ac83dd6..24455af 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/AbstractHadoopGraphComputer.java
@@ -133,6 +133,11 @@ public abstract class AbstractHadoopGraphComputer implements GraphComputer {
         // if too many workers are requested, throw appropriate exception
         if (this.workers > this.features().getMaxWorkers())
             throw GraphComputer.Exceptions.computerRequiresMoreWorkersThanSupported(this.workers, this.features().getMaxWorkers());
+        // all map reducers must have a map stage
+        for (final MapReduce mapReduce : this.mapReducers) {
+            if (!mapReduce.doStage(MapReduce.Stage.MAP))
+                throw GraphComputer.Exceptions.mapReduceJobsMustHaveAMapStage(mapReduce);
+        }
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0a224303/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
index 364e71a..cefcaf1 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/util/MapReduceHelper.java
@@ -68,74 +68,70 @@ public final class MapReduceHelper {
         apacheConfiguration.setDelimiterParsingDisabled(true);
         mapReduce.storeState(apacheConfiguration);
         ConfUtil.mergeApacheIntoHadoopConfiguration(apacheConfiguration, newConfiguration);
-        if (!mapReduce.doStage(MapReduce.Stage.MAP)) {
-            final Path memoryPath = new Path(Constants.getMemoryLocation(newConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), mapReduce.getMemoryKey()));
-            mapReduce.addResultToMemory(memory, new ObjectWritableIterator(newConfiguration, memoryPath));
+
+        final Optional<Comparator<?>> mapSort = mapReduce.getMapKeySort();
+        final Optional<Comparator<?>> reduceSort = mapReduce.getReduceKeySort();
+        newConfiguration.setClass(Constants.GREMLIN_HADOOP_MAP_REDUCE_CLASS, mapReduce.getClass(), MapReduce.class);
+        final Job job = Job.getInstance(newConfiguration, mapReduce.toString());
+        HadoopGraph.LOGGER.info(Constants.GREMLIN_HADOOP_JOB_PREFIX + mapReduce.toString());
+        job.setJarByClass(HadoopGraph.class);
+        if (mapSort.isPresent())
+            job.setSortComparatorClass(ObjectWritableComparator.ObjectWritableMapComparator.class);
+        job.setMapperClass(HadoopMap.class);
+        if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
+            if (mapReduce.doStage(MapReduce.Stage.COMBINE))
+                job.setCombinerClass(HadoopCombine.class);
+            job.setReducerClass(HadoopReduce.class);
         } else {
-            final Optional<Comparator<?>> mapSort = mapReduce.getMapKeySort();
-            final Optional<Comparator<?>> reduceSort = mapReduce.getReduceKeySort();
-            newConfiguration.setClass(Constants.GREMLIN_HADOOP_MAP_REDUCE_CLASS, mapReduce.getClass(), MapReduce.class);
-            final Job job = Job.getInstance(newConfiguration, mapReduce.toString());
-            HadoopGraph.LOGGER.info(Constants.GREMLIN_HADOOP_JOB_PREFIX + mapReduce.toString());
-            job.setJarByClass(HadoopGraph.class);
-            if (mapSort.isPresent())
-                job.setSortComparatorClass(ObjectWritableComparator.ObjectWritableMapComparator.class);
-            job.setMapperClass(HadoopMap.class);
-            if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
-                if (mapReduce.doStage(MapReduce.Stage.COMBINE))
-                    job.setCombinerClass(HadoopCombine.class);
-                job.setReducerClass(HadoopReduce.class);
-            } else {
-                if (mapSort.isPresent()) {
-                    job.setReducerClass(Reducer.class);
-                    job.setNumReduceTasks(1); // todo: is this necessary to ensure sorted order?
-                } else {
-                    job.setNumReduceTasks(0);
-                }
-            }
-            job.setMapOutputKeyClass(ObjectWritable.class);
-            job.setMapOutputValueClass(ObjectWritable.class);
-            job.setOutputKeyClass(ObjectWritable.class);
-            job.setOutputValueClass(ObjectWritable.class);
-            job.setInputFormatClass(GraphFilterInputFormat.class);
-            job.setOutputFormatClass(SequenceFileOutputFormat.class);
-            // if there is no vertex program, then grab the graph from the input location
-            final Path graphPath;
-            if (vertexProgramExists) {
-                graphPath = new Path(Constants.getGraphLocation(newConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)));
+            if (mapSort.isPresent()) {
+                job.setReducerClass(Reducer.class);
+                job.setNumReduceTasks(1); // todo: is this necessary to ensure sorted order?
             } else {
-                graphPath = new Path(newConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
+                job.setNumReduceTasks(0);
             }
+        }
+        job.setMapOutputKeyClass(ObjectWritable.class);
+        job.setMapOutputValueClass(ObjectWritable.class);
+        job.setOutputKeyClass(ObjectWritable.class);
+        job.setOutputValueClass(ObjectWritable.class);
+        job.setInputFormatClass(GraphFilterInputFormat.class);
+        job.setOutputFormatClass(SequenceFileOutputFormat.class);
+        // if there is no vertex program, then grab the graph from the input location
+        final Path graphPath;
+        if (vertexProgramExists) {
+            graphPath = new Path(Constants.getGraphLocation(newConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION)));
+        } else {
+            graphPath = new Path(newConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION));
+        }
 
-            Path memoryPath = new Path(Constants.getMemoryLocation(newConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), (reduceSort.isPresent() ? mapReduce.getMemoryKey() + "-temp" : mapReduce.getMemoryKey())));
-            if (FileSystem.get(newConfiguration).exists(memoryPath)) {
-                FileSystem.get(newConfiguration).delete(memoryPath, true);
-            }
-            FileInputFormat.setInputPaths(job, graphPath);
-            FileOutputFormat.setOutputPath(job, memoryPath);
-            job.waitForCompletion(true);
+        Path memoryPath = new Path(Constants.getMemoryLocation(newConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), (reduceSort.isPresent() ? mapReduce.getMemoryKey() + "-temp" : mapReduce.getMemoryKey())));
+        if (FileSystem.get(newConfiguration).exists(memoryPath)) {
+            FileSystem.get(newConfiguration).delete(memoryPath, true);
+        }
+        FileInputFormat.setInputPaths(job, graphPath);
+        FileOutputFormat.setOutputPath(job, memoryPath);
+        job.waitForCompletion(true);
 
-            // if there is a reduce sort, we need to run another identity MapReduce job
-            if (reduceSort.isPresent()) {
-                final Job reduceSortJob = Job.getInstance(newConfiguration, "ReduceKeySort");
-                reduceSortJob.setSortComparatorClass(ObjectWritableComparator.ObjectWritableReduceComparator.class);
-                reduceSortJob.setMapperClass(Mapper.class);
-                reduceSortJob.setReducerClass(Reducer.class);
-                reduceSortJob.setMapOutputKeyClass(ObjectWritable.class);
-                reduceSortJob.setMapOutputValueClass(ObjectWritable.class);
-                reduceSortJob.setOutputKeyClass(ObjectWritable.class);
-                reduceSortJob.setOutputValueClass(ObjectWritable.class);
-                reduceSortJob.setInputFormatClass(SequenceFileInputFormat.class);
-                reduceSortJob.setOutputFormatClass(SequenceFileOutputFormat.class);
-                reduceSortJob.setNumReduceTasks(1); // todo: is this necessary to ensure sorted order?
-                FileInputFormat.setInputPaths(reduceSortJob, memoryPath);
-                final Path sortedMemoryPath = new Path(Constants.getMemoryLocation(newConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), mapReduce.getMemoryKey()));
-                FileOutputFormat.setOutputPath(reduceSortJob, sortedMemoryPath);
-                reduceSortJob.waitForCompletion(true);
-                FileSystem.get(newConfiguration).delete(memoryPath, true); // delete the temporary memory path
-                memoryPath = sortedMemoryPath;
-            }
-            mapReduce.addResultToMemory(memory, new ObjectWritableIterator(newConfiguration, memoryPath));
+        // if there is a reduce sort, we need to run another identity MapReduce job
+        if (reduceSort.isPresent()) {
+            final Job reduceSortJob = Job.getInstance(newConfiguration, "ReduceKeySort");
+            reduceSortJob.setSortComparatorClass(ObjectWritableComparator.ObjectWritableReduceComparator.class);
+            reduceSortJob.setMapperClass(Mapper.class);
+            reduceSortJob.setReducerClass(Reducer.class);
+            reduceSortJob.setMapOutputKeyClass(ObjectWritable.class);
+            reduceSortJob.setMapOutputValueClass(ObjectWritable.class);
+            reduceSortJob.setOutputKeyClass(ObjectWritable.class);
+            reduceSortJob.setOutputValueClass(ObjectWritable.class);
+            reduceSortJob.setInputFormatClass(SequenceFileInputFormat.class);
+            reduceSortJob.setOutputFormatClass(SequenceFileOutputFormat.class);
+            reduceSortJob.setNumReduceTasks(1); // todo: is this necessary to ensure sorted order?
+            FileInputFormat.setInputPaths(reduceSortJob, memoryPath);
+            final Path sortedMemoryPath = new Path(Constants.getMemoryLocation(newConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION), mapReduce.getMemoryKey()));
+            FileOutputFormat.setOutputPath(reduceSortJob, sortedMemoryPath);
+            reduceSortJob.waitForCompletion(true);
+            FileSystem.get(newConfiguration).delete(memoryPath, true); // delete the temporary memory path
+            memoryPath = sortedMemoryPath;
         }
+        mapReduce.addResultToMemory(memory, new ObjectWritableIterator(newConfiguration, memoryPath));
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/0a224303/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index d38da14..c9bf7ec 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -131,7 +131,10 @@ public final class TinkerGraphComputer implements GraphComputer {
         // ensure requested workers are not larger than supported workers
         if (this.workers > this.features().getMaxWorkers())
             throw GraphComputer.Exceptions.computerRequiresMoreWorkersThanSupported(this.workers, this.features().getMaxWorkers());
-
+        for (final MapReduce mapReduce : this.mapReducers) {
+            if (!mapReduce.doStage(MapReduce.Stage.MAP))
+                throw GraphComputer.Exceptions.mapReduceJobsMustHaveAMapStage(mapReduce);
+        }
 
         // initialize the memory
         this.memory = new TinkerMemory(this.vertexProgram, this.mapReducers);
@@ -177,40 +180,38 @@ public final class TinkerGraphComputer implements GraphComputer {
 
                 // execute mapreduce jobs
                 for (final MapReduce mapReduce : mapReducers) {
-                    if (mapReduce.doStage(MapReduce.Stage.MAP)) {
-                        final TinkerMapEmitter<?, ?> mapEmitter = new TinkerMapEmitter<>(mapReduce.doStage(MapReduce.Stage.REDUCE));
-                        final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
-                        workers.setMapReduce(mapReduce);
+                    final TinkerMapEmitter<?, ?> mapEmitter = new TinkerMapEmitter<>(mapReduce.doStage(MapReduce.Stage.REDUCE));
+                    final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(this.graph.vertices());
+                    workers.setMapReduce(mapReduce);
+                    workers.executeMapReduce(workerMapReduce -> {
+                        workerMapReduce.workerStart(MapReduce.Stage.MAP);
+                        while (true) {
+                            final Vertex vertex = vertices.next();
+                            if (null == vertex) break;
+                            workerMapReduce.map(ComputerGraph.mapReduce(vertex), mapEmitter);
+                        }
+                        workerMapReduce.workerEnd(MapReduce.Stage.MAP);
+                    });
+                    // sort results if a map output sort is defined
+                    mapEmitter.complete(mapReduce);
+
+                    // no need to run combiners as this is single machine
+                    if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
+                        final TinkerReduceEmitter<?, ?> reduceEmitter = new TinkerReduceEmitter<>();
+                        final SynchronizedIterator<Map.Entry<?, Queue<?>>> keyValues = new SynchronizedIterator((Iterator) mapEmitter.reduceMap.entrySet().iterator());
                         workers.executeMapReduce(workerMapReduce -> {
-                            workerMapReduce.workerStart(MapReduce.Stage.MAP);
+                            workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
                             while (true) {
-                                final Vertex vertex = vertices.next();
-                                if (null == vertex) break;
-                                workerMapReduce.map(ComputerGraph.mapReduce(vertex), mapEmitter);
+                                final Map.Entry<?, Queue<?>> entry = keyValues.next();
+                                if (null == entry) break;
+                                workerMapReduce.reduce(entry.getKey(), entry.getValue().iterator(), reduceEmitter);
                             }
-                            workerMapReduce.workerEnd(MapReduce.Stage.MAP);
+                            workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
                         });
-                        // sort results if a map output sort is defined
-                        mapEmitter.complete(mapReduce);
-
-                        // no need to run combiners as this is single machine
-                        if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
-                            final TinkerReduceEmitter<?, ?> reduceEmitter = new TinkerReduceEmitter<>();
-                            final SynchronizedIterator<Map.Entry<?, Queue<?>>> keyValues = new SynchronizedIterator((Iterator) mapEmitter.reduceMap.entrySet().iterator());
-                            workers.executeMapReduce(workerMapReduce -> {
-                                workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
-                                while (true) {
-                                    final Map.Entry<?, Queue<?>> entry = keyValues.next();
-                                    if (null == entry) break;
-                                    workerMapReduce.reduce(entry.getKey(), entry.getValue().iterator(), reduceEmitter);
-                                }
-                                workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
-                            });
-                            reduceEmitter.complete(mapReduce); // sort results if a reduce output sort is defined
-                            mapReduce.addResultToMemory(this.memory, reduceEmitter.reduceQueue.iterator());
-                        } else {
-                            mapReduce.addResultToMemory(this.memory, mapEmitter.mapQueue.iterator());
-                        }
+                        reduceEmitter.complete(mapReduce); // sort results if a reduce output sort is defined
+                        mapReduce.addResultToMemory(this.memory, reduceEmitter.reduceQueue.iterator());
+                    } else {
+                        mapReduce.addResultToMemory(this.memory, mapEmitter.mapQueue.iterator());
                     }
                 }
                 // update runtime and return the newly computed graph