You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/19 18:17:41 UTC
incubator-tinkerpop git commit: MapReduce now has a worker start/end
method. Fixed up TinkerGraphComputer, SparkGraphComputer,
and GiraphGraphComputer to support the new method. Fixed a bug in Spark
message passing.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 54ac15059 -> b563dc3d7
MapReduce now has a worker start/end method. Fixed up TinkerGraphComputer, SparkGraphComputer, and GiraphGraphComputer to support the new method. Fixed a bug in Spark message passing.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/b563dc3d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/b563dc3d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/b563dc3d
Branch: refs/heads/master
Commit: b563dc3d7cdb5829ba1d1a5e33483467fa92e39b
Parents: 54ac150
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Mar 19 11:17:37 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Mar 19 11:17:37 2015 -0600
----------------------------------------------------------------------
.../gremlin/process/computer/MapReduce.java | 28 ++++++++-
.../gremlin/process/computer/VertexProgram.java | 8 +--
.../process/computer/util/MapReducePool.java | 60 +++++++-------------
.../computer/util/VertexProgramPool.java | 51 +++++------------
.../hadoop/process/computer/HadoopCombine.java | 21 +++----
.../hadoop/process/computer/HadoopMap.java | 10 +++-
.../hadoop/process/computer/HadoopReduce.java | 20 +++----
.../computer/spark/util/SparkHelper.java | 32 +++++++----
.../structure/io/ObjectWritableComparator.java | 4 +-
.../process/computer/TinkerGraphComputer.java | 4 ++
.../process/computer/TinkerWorkerPool.java | 13 +++++
11 files changed, 128 insertions(+), 123 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
index f40ce0d..91e3afe 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/MapReduce.java
@@ -18,8 +18,8 @@
*/
package org.apache.tinkerpop.gremlin.process.computer;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
import java.io.IOException;
import java.io.ObjectInputStream;
@@ -121,6 +121,30 @@ public interface MapReduce<MK, MV, RK, RV, R> extends Cloneable {
}
/**
+ * This method is called at the start of the respective {@link MapReduce.Stage} for a particular "chunk of vertices."
+ * The set of vertices in the graph are typically not processed with full parallelism.
+ * The vertex set is split into subsets and a worker is assigned to call the MapReduce methods on it method.
+ * The default implementation is a no-op.
+ *
+ * @param stage the stage of the MapReduce computation
+ */
+ public default void workerStart(final Stage stage) {
+
+ }
+
+ /**
+ * This method is called at the end of the respective {@link MapReduce.Stage} for a particular "chunk of vertices."
+ * The set of vertices in the graph are typically not processed with full parallelism.
+ * The vertex set is split into subsets and a worker is assigned to call the MapReduce methods on it method.
+ * The default implementation is a no-op.
+ *
+ * @param stage the stage of the MapReduce computation
+ */
+ public default void workerEnd(final Stage stage) {
+
+ }
+
+ /**
* If a {@link Comparator} is provided, then all pairs leaving the {@link MapEmitter} are sorted.
* The sorted results are either fed sorted to the combine/reduce-stage or as the final output.
* If sorting is not required, then {@link Optional#empty} should be returned as sorting is computationally expensive.
@@ -187,7 +211,7 @@ public interface MapReduce<MK, MV, RK, RV, R> extends Cloneable {
* @param configuration A configuration with requisite information to build a MapReduce
* @return the newly constructed MapReduce
*/
- public static <M extends MapReduce<MK, MV, RK, RV, R>, MK, MV, RK, RV, R> M createMapReduce(final Configuration configuration) {
+ public static <M extends MapReduce> M createMapReduce(final Configuration configuration) {
try {
final Class<M> mapReduceClass = (Class) Class.forName(configuration.getString(MAP_REDUCE));
final Constructor<M> constructor = mapReduceClass.getDeclaredConstructor();
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java
index e93321d..4cfebb2 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/VertexProgram.java
@@ -19,8 +19,8 @@
package org.apache.tinkerpop.gremlin.process.computer;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.commons.configuration.Configuration;
+import org.apache.tinkerpop.gremlin.structure.Vertex;
import java.lang.reflect.Constructor;
import java.util.Collections;
@@ -45,6 +45,8 @@ public interface VertexProgram<M> extends Cloneable {
public static final String VERTEX_PROGRAM = "gremlin.vertexProgram";
+ public enum Status {SUCCESS, FAILURE}
+
/**
* When it is necessary to store the state of the VertexProgram, this method is called.
* This is typically required when the VertexProgram needs to be serialized to another machine.
@@ -106,7 +108,6 @@ public interface VertexProgram<M> extends Cloneable {
* This method is called at the start of each iteration of each "computational chunk."
* The set of vertices in the graph are typically not processed with full parallelism.
* The vertex set is split into subsets and a worker is assigned to call the {@link VertexProgram#execute} method.
- * The typical use is to create static VertexProgram state that exists for the iteration of the vertex subset.
* The default implementation is a no-op.
*
* @param memory The memory at the start of the iteration.
@@ -119,10 +120,9 @@ public interface VertexProgram<M> extends Cloneable {
* This method is called at the end of each iteration of each "computational chunk."
* The set of vertices in the graph are typically not processed with full parallelism.
* The vertex set is split into subsets and a worker is assigned to call the {@link VertexProgram#execute} method.
- * The typical use is to destroy static VertexProgram state that existed during the iteration of the vertex subset.
* The default implementation is a no-op.
*
- * @param memory The memory at the start of the iteration.
+ * @param memory The memory at the end of the iteration.
*/
public default void workerIterationEnd(final Memory memory) {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapReducePool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapReducePool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapReducePool.java
index 167b01a..1efe26c 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapReducePool.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/MapReducePool.java
@@ -19,6 +19,7 @@
package org.apache.tinkerpop.gremlin.process.computer.util;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
+import org.apache.tinkerpop.gremlin.process.computer.Memory;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -28,61 +29,40 @@ import java.util.concurrent.TimeUnit;
*/
public final class MapReducePool {
private final LinkedBlockingQueue<MapReduce<?, ?, ?, ?, ?>> pool;
- private final StaticMapReduce mapReduce;
private static final int TIMEOUT_MS = 2500;
public MapReducePool(final MapReduce mapReduce, final int poolSize) {
- if (mapReduce instanceof StaticMapReduce) {
- this.pool = null;
- this.mapReduce = (StaticMapReduce) mapReduce;
- } else {
- this.mapReduce = null;
- this.pool = new LinkedBlockingQueue<>(poolSize);
- while (this.pool.remainingCapacity() > 0) {
- this.pool.add(mapReduce.clone());
- }
+ this.pool = new LinkedBlockingQueue<>(poolSize);
+ while (this.pool.remainingCapacity() > 0) {
+ this.pool.add(mapReduce.clone());
}
}
public MapReduce take() {
- if (null == this.mapReduce) {
- try {
- return this.pool.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
- } catch (final InterruptedException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- } else {
- return this.mapReduce;
+ try {
+ return this.pool.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException e) {
+ throw new IllegalStateException(e.getMessage(), e);
}
}
public void offer(final MapReduce<?, ?, ?, ?, ?> mapReduce) {
- if (null == this.mapReduce) {
- try {
- this.pool.offer(mapReduce, TIMEOUT_MS, TimeUnit.MILLISECONDS);
- } catch (final InterruptedException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
+ try {
+ this.pool.offer(mapReduce, TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException e) {
+ throw new IllegalStateException(e.getMessage(), e);
}
}
- /*public synchronized void workerIterationStart(final Memory memory) {
- if (null == this.mapReduce) {
- for (final MapReduce<?,?,?,?,?> mapReduce : this.pool) {
- mapReduce.workerIterationStart(memory);
- }
- } else {
- this.mapReduce.workerIterationStart(memory);
+ public synchronized void workerStart(final MapReduce.Stage stage) {
+ for (final MapReduce<?, ?, ?, ?, ?> mapReduce : this.pool) {
+ mapReduce.workerStart(stage);
}
- }*/
+ }
- /*public synchronized void workerIterationEnd(final Memory memory) {
- if (null == this.mapReduce) {
- for (final MapReduce<?,?,?,?,?> mapReduce : this.pool) {
- mapReduce.workerIterationEnd(memory);
- }
- } else {
- this.mapReduce.workerIterationEnd(memory);
+ public synchronized void workerEnd(final MapReduce.Stage stage) {
+ for (final MapReduce<?, ?, ?, ?, ?> mapReduce : this.pool) {
+ mapReduce.workerEnd(stage);
}
- }*/
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramPool.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramPool.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramPool.java
index 509b16d..0632708 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramPool.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/computer/util/VertexProgramPool.java
@@ -30,61 +30,40 @@ import java.util.concurrent.TimeUnit;
public final class VertexProgramPool {
private final LinkedBlockingQueue<VertexProgram<?>> pool;
- private final StaticVertexProgram vertexProgram;
private static final int TIMEOUT_MS = 2500;
public VertexProgramPool(final VertexProgram vertexProgram, final int poolSize) {
- if (vertexProgram instanceof StaticVertexProgram) {
- this.pool = null;
- this.vertexProgram = (StaticVertexProgram) vertexProgram;
- } else {
- this.vertexProgram = null;
- this.pool = new LinkedBlockingQueue<>(poolSize);
- while (this.pool.remainingCapacity() > 0) {
- this.pool.add(vertexProgram.clone());
- }
+ this.pool = new LinkedBlockingQueue<>(poolSize);
+ while (this.pool.remainingCapacity() > 0) {
+ this.pool.add(vertexProgram.clone());
}
}
public VertexProgram take() {
- if (null == this.vertexProgram) {
- try {
- return this.pool.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
- } catch (final InterruptedException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
- } else {
- return this.vertexProgram;
+ try {
+ return this.pool.poll(TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException e) {
+ throw new IllegalStateException(e.getMessage(), e);
}
}
public void offer(final VertexProgram<?> vertexProgram) {
- if (null == this.vertexProgram) {
- try {
- this.pool.offer(vertexProgram, TIMEOUT_MS, TimeUnit.MILLISECONDS);
- } catch (final InterruptedException e) {
- throw new IllegalStateException(e.getMessage(), e);
- }
+ try {
+ this.pool.offer(vertexProgram, TIMEOUT_MS, TimeUnit.MILLISECONDS);
+ } catch (final InterruptedException e) {
+ throw new IllegalStateException(e.getMessage(), e);
}
}
public synchronized void workerIterationStart(final Memory memory) {
- if (null == this.vertexProgram) {
- for (final VertexProgram<?> vertexProgram : this.pool) {
- vertexProgram.workerIterationStart(memory);
- }
- } else {
- this.vertexProgram.workerIterationStart(memory);
+ for (final VertexProgram<?> vertexProgram : this.pool) {
+ vertexProgram.workerIterationStart(memory);
}
}
public synchronized void workerIterationEnd(final Memory memory) {
- if (null == this.vertexProgram) {
- for (final VertexProgram<?> vertexProgram : this.pool) {
- vertexProgram.workerIterationEnd(memory);
- }
- } else {
- this.vertexProgram.workerIterationEnd(memory);
+ for (final VertexProgram<?> vertexProgram : this.pool) {
+ vertexProgram.workerIterationEnd(memory);
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java
index 336519e..e7b94b5 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopCombine.java
@@ -18,15 +18,15 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.process.computer;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Iterator;
/**
* @author Marko A. Rodriguez (http://markorodriguez.com)
@@ -46,23 +46,18 @@ public class HadoopCombine extends Reducer<ObjectWritable, ObjectWritable, Objec
@Override
public void setup(final Reducer<ObjectWritable, ObjectWritable, ObjectWritable, ObjectWritable>.Context context) {
this.mapReduce = MapReduce.createMapReduce(ConfUtil.makeApacheConfiguration(context.getConfiguration()));
+ this.mapReduce.workerStart(MapReduce.Stage.COMBINE);
}
@Override
public void reduce(final ObjectWritable key, final Iterable<ObjectWritable> values, final Reducer<ObjectWritable, ObjectWritable, ObjectWritable, ObjectWritable>.Context context) throws IOException, InterruptedException {
- final Iterator<ObjectWritable> itty = values.iterator();
this.combineEmitter.setContext(context);
- this.mapReduce.combine(key.get(), new Iterator() {
- @Override
- public boolean hasNext() {
- return itty.hasNext();
- }
+ this.mapReduce.combine(key.get(), IteratorUtils.map(values.iterator(), ObjectWritable::get), this.combineEmitter);
+ }
- @Override
- public Object next() {
- return itty.next().get();
- }
- }, this.combineEmitter);
+ @Override
+ public void cleanup(final Reducer<ObjectWritable, ObjectWritable, ObjectWritable, ObjectWritable>.Context context) {
+ this.mapReduce.workerEnd(MapReduce.Stage.COMBINE);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java
index 0a6d41c..d88c134 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopMap.java
@@ -18,12 +18,12 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.process.computer;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.Mapper;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.VertexWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.hadoop.io.NullWritable;
-import org.apache.hadoop.mapreduce.Mapper;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -45,6 +45,7 @@ public class HadoopMap extends Mapper<NullWritable, VertexWritable, ObjectWritab
@Override
public void setup(final Mapper<NullWritable, VertexWritable, ObjectWritable, ObjectWritable>.Context context) {
this.mapReduce = MapReduce.createMapReduce(ConfUtil.makeApacheConfiguration(context.getConfiguration()));
+ this.mapReduce.workerStart(MapReduce.Stage.MAP);
}
@Override
@@ -53,6 +54,11 @@ public class HadoopMap extends Mapper<NullWritable, VertexWritable, ObjectWritab
this.mapReduce.map(value.get(), this.mapEmitter);
}
+ @Override
+ public void cleanup(final Mapper<NullWritable, VertexWritable, ObjectWritable, ObjectWritable>.Context context) {
+ this.mapReduce.workerEnd(MapReduce.Stage.MAP);
+ }
+
public class HadoopMapEmitter<K, V> implements MapReduce.MapEmitter<K, V> {
private Mapper<NullWritable, VertexWritable, ObjectWritable, ObjectWritable>.Context context;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java
index 66af73e..a0ddf67 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/HadoopReduce.java
@@ -18,10 +18,11 @@
*/
package org.apache.tinkerpop.gremlin.hadoop.process.computer;
+import org.apache.hadoop.mapreduce.Reducer;
import org.apache.tinkerpop.gremlin.hadoop.structure.io.ObjectWritable;
import org.apache.tinkerpop.gremlin.hadoop.structure.util.ConfUtil;
import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
-import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,23 +45,18 @@ public class HadoopReduce extends Reducer<ObjectWritable, ObjectWritable, Object
@Override
public void setup(final Reducer<ObjectWritable, ObjectWritable, ObjectWritable, ObjectWritable>.Context context) {
this.mapReduce = MapReduce.createMapReduce(ConfUtil.makeApacheConfiguration(context.getConfiguration()));
+ this.mapReduce.workerStart(MapReduce.Stage.REDUCE);
}
@Override
public void reduce(final ObjectWritable key, final Iterable<ObjectWritable> values, final Reducer<ObjectWritable, ObjectWritable, ObjectWritable, ObjectWritable>.Context context) throws IOException, InterruptedException {
- final Iterator<ObjectWritable> itty = values.iterator();
this.reduceEmitter.setContext(context);
- this.mapReduce.reduce(key.get(), new Iterator() {
- @Override
- public boolean hasNext() {
- return itty.hasNext();
- }
+ this.mapReduce.reduce(key.get(), IteratorUtils.map(values.iterator(), ObjectWritable::get), this.reduceEmitter);
+ }
- @Override
- public Object next() {
- return itty.next().get();
- }
- }, this.reduceEmitter);
+ @Override
+ public void cleanup(final Reducer<ObjectWritable, ObjectWritable, ObjectWritable, ObjectWritable>.Context context) {
+ this.mapReduce.workerEnd(MapReduce.Stage.REDUCE);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
index e74f0fd..6cd1f49 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -45,6 +45,8 @@ import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import scala.Tuple2;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Optional;
import java.util.Set;
@@ -63,11 +65,13 @@ public final class SparkHelper {
final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys();
workerVertexProgram.workerIterationStart(memory);
- return () -> IteratorUtils.<Tuple2<Object, SparkPayload<M>>, Tuple2<Object, SparkPayload<M>>>map(partitionIterator, keyValue -> {
+ final List<Tuple2<Object, SparkPayload<M>>> emission = new ArrayList<>();
+ partitionIterator.forEachRemaining(keyValue -> {
workerVertexProgram.execute(ComputerGraph.of(keyValue._2().asVertexPayload().getVertex(), elementComputeKeys), keyValue._2().asVertexPayload(), memory);
- if (!partitionIterator.hasNext()) workerVertexProgram.workerIterationEnd(memory); // is this safe?
- return keyValue;
+ emission.add(keyValue);
});
+ workerVertexProgram.workerIterationEnd(memory);
+ return emission;
});
// emit messages by appending them to the graph as message payloads
@@ -107,29 +111,33 @@ public final class SparkHelper {
return current;
}
- public static <K, V, M> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final MapReduce<K, V, ?, ?, ?> globalMapReduce, final Configuration apacheConfiguration) {
+ public static <K, V, M> JavaPairRDD<K, V> executeMap(final JavaPairRDD<Object, SparkPayload<M>> graphRDD, final MapReduce<K, V, ?, ?, ?> mapReduce, final Configuration apacheConfiguration) {
JavaPairRDD<K, V> mapRDD = graphRDD.mapPartitionsToPair(partitionIterator -> {
- final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.createMapReduce(apacheConfiguration);
+ final MapReduce<K, V, ?, ?, ?> workerMapReduce = MapReduce.<MapReduce<K, V, ?, ?, ?>>createMapReduce(apacheConfiguration);
+ workerMapReduce.workerStart(MapReduce.Stage.MAP);
final SparkMapEmitter<K, V> mapEmitter = new SparkMapEmitter<>();
partitionIterator.forEachRemaining(keyValue -> workerMapReduce.map(keyValue._2().asVertexPayload().getVertex(), mapEmitter));
+ workerMapReduce.workerEnd(MapReduce.Stage.MAP);
return mapEmitter.getEmissions();
});
- if (globalMapReduce.getMapKeySort().isPresent())
- mapRDD = mapRDD.sortByKey(globalMapReduce.getMapKeySort().get());
+ if (mapReduce.getMapKeySort().isPresent())
+ mapRDD = mapRDD.sortByKey(mapReduce.getMapKeySort().get());
return mapRDD;
}
- // TODO: public static executeCombine()
+ // TODO: public static executeCombine() is this necessary?
- public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> globalMapReduce, final Configuration apacheConfiguration) {
+ public static <K, V, OK, OV> JavaPairRDD<OK, OV> executeReduce(final JavaPairRDD<K, V> mapRDD, final MapReduce<K, V, OK, OV, ?> mapReduce, final Configuration apacheConfiguration) {
JavaPairRDD<OK, OV> reduceRDD = mapRDD.groupByKey().mapPartitionsToPair(partitionIterator -> {
- final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.createMapReduce(apacheConfiguration);
+ final MapReduce<K, V, OK, OV, ?> workerMapReduce = MapReduce.<MapReduce<K, V, OK, OV, ?>>createMapReduce(apacheConfiguration);
+ workerMapReduce.workerStart(MapReduce.Stage.REDUCE);
final SparkReduceEmitter<OK, OV> reduceEmitter = new SparkReduceEmitter<>();
partitionIterator.forEachRemaining(keyValue -> workerMapReduce.reduce(keyValue._1(), keyValue._2().iterator(), reduceEmitter));
+ workerMapReduce.workerEnd(MapReduce.Stage.REDUCE);
return reduceEmitter.getEmissions();
});
- if (globalMapReduce.getReduceKeySort().isPresent())
- reduceRDD = reduceRDD.sortByKey(globalMapReduce.getReduceKeySort().get());
+ if (mapReduce.getReduceKeySort().isPresent())
+ reduceRDD = reduceRDD.sortByKey(mapReduce.getReduceKeySort().get());
return reduceRDD;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableComparator.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableComparator.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableComparator.java
index edab584..7ecdd7d 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableComparator.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/structure/io/ObjectWritableComparator.java
@@ -69,7 +69,7 @@ public abstract class ObjectWritableComparator implements RawComparator<ObjectWr
@Override
public void setConf(final Configuration configuration) {
this.configuration = configuration;
- this.comparator = MapReduce.createMapReduce(ConfUtil.makeApacheConfiguration(configuration)).getMapKeySort().get();
+ this.comparator = MapReduce.<MapReduce<?,?,?,?,?>>createMapReduce(ConfUtil.makeApacheConfiguration(configuration)).getMapKeySort().get();
}
}
@@ -77,7 +77,7 @@ public abstract class ObjectWritableComparator implements RawComparator<ObjectWr
@Override
public void setConf(final Configuration configuration) {
this.configuration = configuration;
- this.comparator = MapReduce.createMapReduce(ConfUtil.makeApacheConfiguration(configuration)).getReduceKeySort().get();
+ this.comparator = MapReduce.<MapReduce<?,?,?,?,?>>createMapReduce(ConfUtil.makeApacheConfiguration(configuration)).getReduceKeySort().get();
}
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
index 6e55987..600a320 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerGraphComputer.java
@@ -139,6 +139,7 @@ public class TinkerGraphComputer implements GraphComputer {
final TinkerMapEmitter<?, ?> mapEmitter = new TinkerMapEmitter<>(mapReduce.doStage(MapReduce.Stage.REDUCE));
final SynchronizedIterator<Vertex> vertices = new SynchronizedIterator<>(computeGraph.vertices());
workers.setMapReduce(mapReduce);
+ workers.mapReduceWorkerStart(MapReduce.Stage.MAP);
workers.executeMapReduce(workerMapReduce -> {
while (true) {
final Vertex vertex = vertices.next();
@@ -146,6 +147,7 @@ public class TinkerGraphComputer implements GraphComputer {
workerMapReduce.map(vertex, mapEmitter);
}
});
+ workers.mapReduceWorkerEnd(MapReduce.Stage.MAP);
// sort results if a map output sort is defined
mapEmitter.complete(mapReduce);
@@ -154,6 +156,7 @@ public class TinkerGraphComputer implements GraphComputer {
if (mapReduce.doStage(MapReduce.Stage.REDUCE)) {
final TinkerReduceEmitter<?, ?> reduceEmitter = new TinkerReduceEmitter<>();
final SynchronizedIterator<Map.Entry<?, Queue<?>>> keyValues = new SynchronizedIterator((Iterator) mapEmitter.reduceMap.entrySet().iterator());
+ workers.mapReduceWorkerStart(MapReduce.Stage.REDUCE);
workers.executeMapReduce(workerMapReduce -> {
while (true) {
final Map.Entry<?, Queue<?>> entry = keyValues.next();
@@ -161,6 +164,7 @@ public class TinkerGraphComputer implements GraphComputer {
workerMapReduce.reduce(entry.getKey(), entry.getValue().iterator(), reduceEmitter);
}
});
+ workers.mapReduceWorkerEnd(MapReduce.Stage.REDUCE);
reduceEmitter.complete(mapReduce); // sort results if a reduce output sort is defined
mapReduce.addResultToMemory(this.memory, reduceEmitter.reduceQueue.iterator());
} else {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/b563dc3d/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
----------------------------------------------------------------------
diff --git a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
index 2e0030d..79aa788 100644
--- a/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
+++ b/tinkergraph-gremlin/src/main/java/org/apache/tinkerpop/gremlin/tinkergraph/process/computer/TinkerWorkerPool.java
@@ -56,6 +56,8 @@ public class TinkerWorkerPool implements AutoCloseable {
this.mapReducePool = new MapReducePool(mapReduce, this.numberOfWorkers);
}
+ ////
+
public void vertexProgramWorkerIterationStart(final Memory memory) {
this.vertexProgramPool.workerIterationStart(memory);
}
@@ -76,6 +78,17 @@ public class TinkerWorkerPool implements AutoCloseable {
}
}
+ ///
+
+ public void mapReduceWorkerStart(final MapReduce.Stage stage) {
+ this.mapReducePool.workerStart(stage);
+ }
+
+ public void mapReduceWorkerEnd(final MapReduce.Stage stage) {
+ this.mapReducePool.workerEnd(stage);
+ }
+
+
public void executeMapReduce(final Consumer<MapReduce> worker) {
try {
this.workerPool.submit(() -> {