You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/19 18:17:41 UTC

incubator-tinkerpop git commit: MapReduce now has a worker start/end method. Fixed up TinkerGraphComputer, SparkGraphComputer, and GiraphGraphComputer to support the new method. Fixed a bug in Spark message passing.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 54ac15059 -> b563dc3d7


MapReduce now has a worker start/end method. Fixed up TinkerGraphComputer, SparkGraphComputer, and GiraphGraphComputer to support the new method. Fixed a bug in Spark message passing.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b563dc3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b563dc3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b563dc3d

Branch: refs/heads/master
Commit: b563dc3d7cdb5829ba1d1a5e33483467fa92e39b
Parents: 54ac150
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Mar 19 11:17:37 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Mar 19 11:17:37 2015 -0600

----------------------------------------------------------------------
 .../gremlin/process/computer/MapReduce.java     | 28 ++++++++-
 .../gremlin/process/computer/VertexProgram.java |  8 +--
 .../process/computer/util/MapReducePool.java    | 60 +++++++-------------
 .../computer/util/VertexProgramPool.java        | 51 +++++------------
 .../hadoop/process/computer/HadoopCombine.java  | 21 +++----
 .../hadoop/process/computer/HadoopMap.java      | 10 +++-
 .../hadoop/process/computer/HadoopReduce.java   | 20 +++----
 .../computer/spark/util/SparkHelper.java        | 32 +++++++----
 .../structure/io/ObjectWritableComparator.java  |  4 +-
 .../process/computer/TinkerGraphComputer.java   |  4 ++
 .../process/computer/TinkerWorkerPool.java      | 13 +++++
 11 files changed, 128 insertions(+), 123 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
index f40ce0d..91e3afe 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
@@ -18,8 +18,8 @@
  */
 package org.apache.tinkerpop.gremlin.process.computer;
 
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
 
 import java.io.IOException;
 import java.io.ObjectInputStream;
@@ -121,6 +121,30 @@ public interface MapReduce<MK, MV, RK, RV, R> extends Cloneable {
     }
 
     /**
+     * This method is called at the start of the respective {@link MapReduce.Stage} for a particular "chunk of vertices."
+     * The set of vertices in the graph are typically not processed with full parallelism.
+     * The vertex set is split into subsets and a worker is assigned to call the MapReduce methods on it method.
+     * The default implementation is a no-op.
+     *
+     * @param stage the stage of the MapReduce computation
+     */
+    public default void workerStart(final Stage stage) {
+
+    }
+
+    /**
+     * This method is called at the end of the respective {@link MapReduce.Stage} for a particular "chunk of vertices."
+     * The set of vertices in the graph are typically not processed with full parallelism.
+     * The vertex set is split into subsets and a worker is assigned to call the MapReduce methods on it method.
+     * The default implementation is a no-op.
+     *
+     * @param stage the stage of the MapReduce computation
+     */
+    public default void workerEnd(final Stage stage) {
+
+    }
+
+    /**
      * If a {@link Comparator} is provided, then all pairs leaving the {@link MapEmitter} are sorted.
      * The sorted results are either fed sorted to the combine/reduce-stage or as the final output.
      * If sorting is not required, then {@link Optional#empty} should be returned as sorting is computationally expensive.
@@ -187,7 +211,7 @@ public interface MapReduce<MK, MV, RK, RV, R> extends Cloneable {
      * @param configuration A configuration with requisite information to build a MapReduce
      * @return the newly constructed MapReduce
      */
-    public static <M extends MapReduce<MK, MV, RK, RV, R>, MK, MV, RK, RV, R> M createMapReduce(final Configuration configuration) {
+    public static <M extends MapReduce> M createMapReduce(final Configuration configuration) {
         try {
             final Class<M> mapReduceClass = (Class) Class.forName(configuration.getString(MAP_REDUCE));
             final Constructor<M> constructor = mapReduceClass.getDeclaredConstructor();

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java
index e93321d..4cfebb2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java
@@ -19,8 +19,8 @@
 
 package org.apache.tinkerpop.gremlin.process.computer;
 
-import org.apache.tinkerpop.gremlin.structure.Vertex;
 import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
 
 import java.lang.reflect.Constructor;
 import java.util.Collections;
@@ -45,6 +45,8 @@ public interface VertexProgram<M> extends Cloneable {
 
     public static final String VERTEX_PROGRAM = "gremlin.vertexProgram";
 
+    public enum Status {SUCCESS, FAILURE}
+
     /**
      * When it is necessary to store the state of the VertexProgram, this method is called.
      * This is typically required when the VertexProgram needs to be serialized to another machine.
@@ -106,7 +108,6 @@ public interface VertexProgram<M> extends Cloneable {
      * This method is called at the start of each iteration of each "computational chunk."
      * The set of vertices in the graph are typically not processed with full parallelism.
      * The vertex set is split into subsets and a worker is assigned to call the {@link VertexProgram#execute} method.
-     * The typical use is to create static VertexProgram state that exists for the iteration of the vertex subset.
      * The default implementation is a no-op.
      *
      * @param memory The memory at the start of the iteration.
@@ -119,10 +120,9 @@ public interface VertexProgram<M> extends Cloneable {
      * This method is called at the end of each iteration of each "computational chunk."
      * The set of vertices in the graph are typically not processed with full parallelism.
      * The vertex set is split into subsets and a worker is assigned to call the {@link VertexProgram#execute} method.
-     * The typical use is to destroy static VertexProgram state that existed during the iteration of the vertex subset.
      * The default implementation is a no-op.
      *
-     * @param memory The memory at the start of the iteration.
+     * @param memory The memory at the end of the iteration.
      */
     public default void workerIterationEnd(final Memory memory) {
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapReducePool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapReducePool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapReducePool.java
index 167b01a..1efe26c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapReducePool.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapReducePool.java
@@ -19,6 +19,7 @@
 package org.apache.tinkerpop.gremlin.process.computer.util;
 
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
 
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
@@ -28,61 +29,40 @@ import java.util.concurrent.TimeUnit;
  */
 public final class MapReducePool {
     private final LinkedBlockingQueue<MapReduce<?, ?, ?, ?, ?>> pool;
-    private final StaticMapReduce mapReduce;
     private static final int TIMEOUT_MS = 2500;
 
     public MapReducePool(final MapReduce mapReduce, final int poolSize) {
-        if (mapReduce instanceof StaticMapReduce) {
-            this.pool = null;
-            this.mapReduce = (StaticMapReduce) mapReduce;
-        } else {
-            this.mapReduce = null;
-            this.pool = new LinkedBlockingQueue<>(poolSize);
-            while (this.pool.remainingCapacity() > 0) {
-                this.pool.add(mapReduce.clone());
-            }
+        this.pool = new LinkedBlockingQueue<>(poolSize);
+        while (this.pool.remainingCapacity() > 0) {
+            this.pool.add(mapReduce.clone());
         }
     }
 
     public MapReduce take() {
-        if (null == this.mapReduce) {
-            try {
-                return this.pool.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
-            } catch (final InterruptedException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
-        } else {
-            return this.mapReduce;
+        try {
+            return this.pool.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        } catch (final InterruptedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
         }
     }
 
     public void offer(final MapReduce<?, ?, ?, ?, ?> mapReduce) {
-        if (null == this.mapReduce) {
-            try {
-                this.pool.offer(mapReduce, TIMEOUT_MS, TimeUnit.MILLISECONDS);
-            } catch (final InterruptedException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
+        try {
+            this.pool.offer(mapReduce, TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        } catch (final InterruptedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
         }
     }
 
-    /*public synchronized void workerIterationStart(final Memory memory) {
-        if (null == this.mapReduce) {
-            for (final MapReduce<?,?,?,?,?> mapReduce : this.pool) {
-                mapReduce.workerIterationStart(memory);
-            }
-        } else {
-            this.mapReduce.workerIterationStart(memory);
+    public synchronized void workerStart(final MapReduce.Stage stage) {
+        for (final MapReduce<?, ?, ?, ?, ?> mapReduce : this.pool) {
+            mapReduce.workerStart(stage);
         }
-    }*/
+    }
 
-    /*public synchronized void workerIterationEnd(final Memory memory) {
-        if (null == this.mapReduce) {
-            for (final MapReduce<?,?,?,?,?> mapReduce : this.pool) {
-                mapReduce.workerIterationEnd(memory);
-            }
-        } else {
-            this.mapReduce.workerIterationEnd(memory);
+    public synchronized void workerEnd(final MapReduce.Stage stage) {
+        for (final MapReduce<?, ?, ?, ?, ?> mapReduce : this.pool) {
+            mapReduce.workerEnd(stage);
         }
-    }*/
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramPool.java
index 509b16d..0632708 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramPool.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramPool.java
@@ -30,61 +30,40 @@ import java.util.concurrent.TimeUnit;
 public final class VertexProgramPool {
 
     private final LinkedBlockingQueue<VertexProgram<?>> pool;
-    private final StaticVertexProgram vertexProgram;
     private static final int TIMEOUT_MS = 2500;
 
     public VertexProgramPool(final VertexProgram vertexProgram, final int poolSize) {
-        if (vertexProgram instanceof StaticVertexProgram) {
-            this.pool = null;
-            this.vertexProgram = (StaticVertexProgram) vertexProgram;
-        } else {
-            this.vertexProgram = null;
-            this.pool = new LinkedBlockingQueue<>(poolSize);
-            while (this.pool.remainingCapacity() > 0) {
-                this.pool.add(vertexProgram.clone());
-            }
+        this.pool = new LinkedBlockingQueue<>(poolSize);
+        while (this.pool.remainingCapacity() > 0) {
+            this.pool.add(vertexProgram.clone());
         }
     }
 
     public VertexProgram take() {
-        if (null == this.vertexProgram) {
-            try {
-                return this.pool.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
-            } catch (final InterruptedException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
-        } else {
-            return this.vertexProgram;
+        try {
+            return this.pool.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        } catch (final InterruptedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
         }
     }
 
     public void offer(final VertexProgram<?> vertexProgram) {
-        if (null == this.vertexProgram) {
-            try {
-                this.pool.offer(vertexProgram, TIMEOUT_MS, TimeUnit.MILLISECONDS);
-            } catch (final InterruptedException e) {
-                throw new IllegalStateException(e.getMessage(), e);
-            }
+        try {
+            this.pool.offer(vertexProgram, TIMEOUT_MS, TimeUnit.MILLISECONDS);
+        } catch (final InterruptedException e) {
+            throw new IllegalStateException(e.getMessage(), e);
         }
     }
 
     public synchronized void workerIterationStart(final Memory memory) {
-        if (null == this.vertexProgram) {
-            for (final VertexProgram<?> vertexProgram : this.pool) {
-                vertexProgram.workerIterationStart(memory);
-            }
-        } else {
-            this.vertexProgram.workerIterationStart(memory);
+        for (final VertexProgram<?> vertexProgram : this.pool) {
+            vertexProgram.workerIterationStart(memory);
         }
     }
 
     public synchronized void workerIterationEnd(final Memory memory) {
-        if (null == this.vertexProgram) {
-            for (final VertexProgram<?> vertexProgram : this.pool) {
-                vertexProgram.workerIterationEnd(memory);
-            }
-        } else {
-            this.vertexProgram.workerIterationEnd(memory);
+        for (final VertexProgram<?> vertexProgram : this.pool) {
+            vertexProgram.workerIterationEnd(memory);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java
index 336519e..e7b94b5 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java
@@ -18,15 +18,15 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.process.computer;
 
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Iterator;
 
 /**
  * @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -46,23 +46,18 @@ public class HadoopCombine extends Reducer<ObjectWritable, ObjectWritable, Objec
     @Override
     public void setup(final Reducer<ObjectWritable, ObjectWritable, ObjectWritable, ObjectWritable>.Context context) {
         this.mapReduce = MapReduce.createMapReduce(ConfUtil.makeApacheConfiguration(context.getConfiguration()));
+        this.mapReduce.workerStart(MapReduce.Stage.COMBINE);
     }
 
     @Override
     public void reduce(final ObjectWritable key, final Iterable<ObjectWritable> values, final Reducer<ObjectWritable, ObjectWritable, ObjectWritable, ObjectWritable>.Context context) throws IOException, InterruptedException {
-        final Iterator<ObjectWritable> itty = values.iterator();
         this.combineEmitter.setContext(context);
-        this.mapReduce.combine(key.get(), new Iterator() {
-            @Override
-            public boolean hasNext() {
-                return itty.hasNext();
-            }
+        this.mapReduce.combine(key.get(), IteratorUtils.map(values.iterator(), ObjectWritable::get), this.combineEmitter);
+    }
 
-            @Override
-            public Object next() {
-                return itty.next().get();
-            }
-        }, this.combineEmitter);
+    @Override
+    public void cleanup(final Reducer<ObjectWritable, ObjectWritable, ObjectWritable, ObjectWritable>.Context context) {
+        this.mapReduce.workerEnd(MapReduce.Stage.COMBINE);
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java
index 0a6d41c..d88c134 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java
@@ -18,12 +18,12 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.process.computer;
 
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Mapper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -45,6 +45,7 @@ public class HadoopMap extends Mapper<NullWritable, VertexWritable, ObjectWritab
     @Override
     public void setup(final Mapper<NullWritable, VertexWritable, ObjectWritable, ObjectWritable>.Context context) {
         this.mapReduce = MapReduce.createMapReduce(ConfUtil.makeApacheConfiguration(context.getConfiguration()));
+        this.mapReduce.workerStart(MapReduce.Stage.MAP);
     }
 
     @Override
@@ -53,6 +54,11 @@ public class HadoopMap extends Mapper<NullWritable, VertexWritable, ObjectWritab
         this.mapReduce.map(value.get(), this.mapEmitter);
     }
 
+    @Override
+    public void cleanup(final Mapper<NullWritable, VertexWritable, ObjectWritable, ObjectWritable>.Context context) {
+        this.mapReduce.workerEnd(MapReduce.Stage.MAP);
+    }
+
     public class HadoopMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
 
         private Mapper<NullWritable, VertexWritable, ObjectWritable, ObjectWritable>.Context context;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java
index 66af73e..a0ddf67 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java
@@ -18,10 +18,11 @@
  */
 package org.apache.tinkerpop.gremlin.hadoop.process.computer;
 
+import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
 import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
 import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -44,23 +45,18 @@ public class HadoopReduce extends Reducer<ObjectWritable, ObjectWritable, Object
     @Override
     public void setup(final Reducer<ObjectWritable, ObjectWritable, ObjectWritable, ObjectWritable>.Context context) {
         this.mapReduce = MapReduce.createMapReduce(ConfUtil.makeApacheConfiguration(context.getConfiguration()));
+        this.mapReduce.workerStart(MapReduce.Stage.REDUCE);
     }
 
     @Override
     public void reduce(final ObjectWritable key, final Iterable<ObjectWritable> values, final Reducer<ObjectWritable, ObjectWritable, ObjectWritable, ObjectWritable>.Context context) throws IOException, InterruptedException {
-        final Iterator<ObjectWritable> itty = values.iterator();
         this.reduceEmitter.setContext(context);
-        this.mapReduce.reduce(key.get(), new Iterator() {
-            @Override
-            public boolean hasNext() {
-                return itty.hasNext();
-            }
+        this.mapReduce.reduce(key.get(), IteratorUtils.map(values.iterator(), ObjectWritable::get), this.reduceEmitter);
+    }
 
-            @Override
-            public Object next() {
-                return itty.next().get();
-            }
-        }, this.reduceEmitter);
+    @Override
+    public void cleanup(final Reducer<ObjectWritable, ObjectWritable, ObjectWritable, ObjectWritable>.Context context) {
+        this.mapReduce.workerEnd(MapReduce.Stage.REDUCE);
     }
 
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
index e74f0fd..6cd1f49 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -45,6 +45,8 @@ import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
 import scala.Tuple2;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 
@@ -63,11 +65,13 @@ public final class SparkHelper {
             final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
             final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
             workerVertexProgram.workerIterationStart(memory);
-            return () -> IteratorUtils.<Tuple2<Object, SparkPayload<M>>, Tuple2<Object, SparkPayload<M>>>map(partitionIterator, keyValue -> {
+            final List<Tuple2<Object, SparkPayload<M>>> emission = new ArrayList<>();
+            partitionIterator.forEachRemaining(keyValue -> {
                 workerVertexProgram.execute(ComputerGraph.of(keyValue._2().asVertexPayload().getVertex(), elementComputeKeys), keyValue._2().asVertexPayload(), memory);
-                if (!partitionIterator.hasNext()) workerVertexProgram.workerIterationEnd(memory);  // is this safe?
-                return keyValue;
+                emission.add(keyValue);
             });
+            workerVertexProgram.workerIterationEnd(memory);
+            return emission;
         });
 
         // emit messages by appending them to the graph as message payloads
@@ -107,29 +111,33 @@ public final class SparkHelper {
         return current;
     }
 
-    public static <K, V, M> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final MapReduce<K, V, ?, ?, ?> globalMapReduce, final Configuration apacheConfiguration) {
+    public static <K, V, M> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
         JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
-            final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.createMapReduce(apacheConfiguration);
+            final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(apacheConfiguration);
+            workerMapReduce.workerStart(MapReduce.Stage.MAP);
             final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
             partitionIterator.forEachRemaining(keyValue -> workerMapReduce.map(keyValue._2().asVertexPayload().getVertex(), mapEmitter));
+            workerMapReduce.workerEnd(MapReduce.Stage.MAP);
             return mapEmitter.getEmissions();
         });
-        if (globalMapReduce.getMapKeySort().isPresent())
-            mapRDD = mapRDD.sortByKey(globalMapReduce.getMapKeySort().get());
+        if (mapReduce.getMapKeySort().isPresent())
+            mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get());
         return mapRDD;
     }
 
-    // TODO: public static executeCombine()
+    // TODO: public static executeCombine()  is this necessary?
 
-    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> globalMapReduce, final Configuration apacheConfiguration) {
+    public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
         JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
-            final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.createMapReduce(apacheConfiguration);
+            final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(apacheConfiguration);
+            workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
             final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
             partitionIterator.forEachRemaining(keyValue -> workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter));
+            workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
             return reduceEmitter.getEmissions();
         });
-        if (globalMapReduce.getReduceKeySort().isPresent())
-            reduceRDD = reduceRDD.sortByKey(globalMapReduce.getReduceKeySort().get());
+        if (mapReduce.getReduceKeySort().isPresent())
+            reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get());
         return reduceRDD;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableComparator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableComparator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableComparator.java
index edab584..7ecdd7d 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableComparator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableComparator.java
@@ -69,7 +69,7 @@ public abstract class ObjectWritableComparator implements RawComparator<ObjectWr
         @Override
         public void setConf(final Configuration configuration) {
             this.configuration = configuration;
-            this.comparator = MapReduce.createMapReduce(ConfUtil.makeApacheConfiguration(configuration)).getMapKeySort().get();
+            this.comparator = MapReduce.<MapReduce<?,?,?,?,?>>createMapReduce(ConfUtil.makeApacheConfiguration(configuration)).getMapKeySort().get();
         }
     }
 
@@ -77,7 +77,7 @@ public abstract class ObjectWritableComparator implements RawComparator<ObjectWr
         @Override
         public void setConf(final Configuration configuration) {
             this.configuration = configuration;
-            this.comparator = MapReduce.createMapReduce(ConfUtil.makeApacheConfiguration(configuration)).getReduceKeySort().get();
+            this.comparator = MapReduce.<MapReduce<?,?,?,?,?>>createMapReduce(ConfUtil.makeApacheConfiguration(configuration)).getReduceKeySort().get();
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 6e55987..600a320 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -139,6 +139,7 @@ public class TinkerGraphComputer implements GraphComputer {
                         final TinkerMapEmitter<?, ?> mapEmitter = new TinkerMapEmitter<>(mapReduce.doStage(MapReduce.Stage.REDUCE));
                         final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(computeGraph.vertices());
                         workers.setMapReduce(mapReduce);
+                        workers.mapReduceWorkerStart(MapReduce.Stage.MAP);
                         workers.executeMapReduce(workerMapReduce -> {
                             while (true) {
                                 final Vertex vertex = vertices.next();
@@ -146,6 +147,7 @@ public class TinkerGraphComputer implements GraphComputer {
                                 workerMapReduce.map(vertex, mapEmitter);
                             }
                         });
+                        workers.mapReduceWorkerEnd(MapReduce.Stage.MAP);
 
                         // sort results if a map output sort is defined
                         mapEmitter.complete(mapReduce);
@@ -154,6 +156,7 @@ public class TinkerGraphComputer implements GraphComputer {
                         if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
                             final TinkerReduceEmitter<?, ?> reduceEmitter = new TinkerReduceEmitter<>();
                             final SynchronizedIterator<Map.Entry<?, Queue<?>>> keyValues = new SynchronizedIterator((Iterator) mapEmitter.reduceMap.entrySet().iterator());
+                            workers.mapReduceWorkerStart(MapReduce.Stage.REDUCE);
                             workers.executeMapReduce(workerMapReduce -> {
                                 while (true) {
                                     final Map.Entry<?, Queue<?>> entry = keyValues.next();
@@ -161,6 +164,7 @@ public class TinkerGraphComputer implements GraphComputer {
                                     workerMapReduce.reduce(entry.getKey(), entry.getValue().iterator(), reduceEmitter);
                                 }
                             });
+                            workers.mapReduceWorkerEnd(MapReduce.Stage.REDUCE);
                             reduceEmitter.complete(mapReduce); // sort results if a reduce output sort is defined
                             mapReduce.addResultToMemory(this.memory, reduceEmitter.reduceQueue.iterator());
                         } else {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index 2e0030d..79aa788 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -56,6 +56,8 @@ public class TinkerWorkerPool implements AutoCloseable {
         this.mapReducePool = new MapReducePool(mapReduce, this.numberOfWorkers);
     }
 
+    ////
+
     public void vertexProgramWorkerIterationStart(final Memory memory) {
         this.vertexProgramPool.workerIterationStart(memory);
     }
@@ -76,6 +78,17 @@ public class TinkerWorkerPool implements AutoCloseable {
         }
     }
 
+    ///
+
+    public void mapReduceWorkerStart(final MapReduce.Stage stage) {
+        this.mapReducePool.workerStart(stage);
+    }
+
+    public void mapReduceWorkerEnd(final MapReduce.Stage stage) {
+        this.mapReducePool.workerEnd(stage);
+    }
+
+
     public void executeMapReduce(final Consumer<MapReduce> worker) {
         try {
             this.workerPool.submit(() -> {