You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/03/04 22:12:25 UTC
incubator-tinkerpop git commit: SparkGraphComputer is becoming a well
oiled machine... so many heres and theres that are making it more and more
memory efficient and lazy.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 4fb660488 -> fca86b9ef
SparkGraphComputer is becoming a well oiled machine... so many heres and theres that are making it more and more memory efficient and lazy.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/fca86b9e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/fca86b9e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/fca86b9e
Branch: refs/heads/master
Commit: fca86b9efdda5e5c8101b68ba7f873ef654471e4
Parents: 4fb6604
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Wed Mar 4 14:12:21 2015 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Mar 4 14:12:21 2015 -0700
----------------------------------------------------------------------
.../computer/spark/SparkGraphComputer.java | 24 +++-----
.../process/computer/spark/SparkMessenger.java | 61 +++++++-------------
.../computer/spark/util/SparkHelper.java | 38 ++++--------
3 files changed, 40 insertions(+), 83 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fca86b9e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 560acc0..571f222 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -130,16 +130,17 @@ public final class SparkGraphComputer implements GraphComputer {
hadoopConfiguration.forEach(entry -> sparkConfiguration.set(entry.getKey(), entry.getValue()));
if (FileInputFormat.class.isAssignableFrom(hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class)))
hadoopConfiguration.set(Constants.MAPRED_INPUT_DIR, hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION)); // necessary for Spark and newAPIHadoopRDD
- final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
- SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
- ///
- try {
+
+ // execute the vertex program
+ try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
+ // add the project jars to the cluster
+ SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
// create a message-passing friendly rdd from the hadoop input format
JavaPairRDD<Object, SparkMessenger<Object>> graphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
(Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
NullWritable.class,
VertexWritable.class)
- .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new SparkMessenger<>(new SparkVertex((TinkerVertex) tuple._2().get()))));
+ .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), SparkMessenger.forGraphVertex(new SparkVertex((TinkerVertex) tuple._2().get()))));
// set up the vertex program and wire up configurations
memory = new SparkMemory(this.vertexProgram, this.mapReducers, sparkContext);
@@ -164,11 +165,7 @@ public final class SparkGraphComputer implements GraphComputer {
// write the output graph back to disk
SparkHelper.saveVertexProgramRDD(graphRDD, hadoopConfiguration);
- } finally {
- // must close the context or bad things happen
- sparkContext.close();
}
- sparkContext.close(); // why not try again todo
}
//////////////////////////////
@@ -185,10 +182,9 @@ public final class SparkGraphComputer implements GraphComputer {
hadoopConfiguration.get(Constants.GREMLIN_HADOOP_INPUT_LOCATION) : // if no vertex program grab the graph from the input location
hadoopConfiguration.get(Constants.GREMLIN_HADOOP_OUTPUT_LOCATION) + "/" + Constants.SYSTEM_G);
- final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration);
- SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
// execute the map reduce job
- try {
+ try (final JavaSparkContext sparkContext = new JavaSparkContext(sparkConfiguration)) {
+ SparkGraphComputer.loadJars(sparkContext, hadoopConfiguration);
final JavaPairRDD<NullWritable, VertexWritable> hadoopGraphRDD = sparkContext.newAPIHadoopRDD(hadoopConfiguration,
(Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
NullWritable.class,
@@ -204,11 +200,7 @@ public final class SparkGraphComputer implements GraphComputer {
// write the map reduce output back to disk (memory)
SparkHelper.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
- } finally {
- // must close the context or bad things happen
- sparkContext.close();
}
- sparkContext.close(); // why not try again todo
}
// update runtime and return the newly computed graph
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fca86b9e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
index 4fea02a..ea6187a 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
@@ -29,15 +29,13 @@ import org.apache.tinkerpop.gremlin.structure.Direction;
import org.apache.tinkerpop.gremlin.structure.Edge;
import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
+import scala.Tuple2;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.List;
-import java.util.Map;
import java.util.Optional;
-import java.util.Set;
import java.util.stream.Stream;
/**
@@ -47,21 +45,26 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
private Vertex vertex;
private List<M> incoming;
- private Map<Object, List<M>> outgoing;
+ private List<Tuple2<Object, M>> outgoing;
- public SparkMessenger() {
+ private SparkMessenger() {
}
- public SparkMessenger(final Vertex vertex) {
- this.vertex = vertex;
- this.incoming = new ArrayList<>();
- this.outgoing = this.vertex instanceof DetachedVertex ? null : new HashMap<>();
+ public static final <M> SparkMessenger<M> forGraphVertex(final Vertex graphVertex) {
+ final SparkMessenger<M> messenger = new SparkMessenger<>();
+ messenger.vertex = graphVertex;
+ messenger.incoming = new ArrayList<>();
+ messenger.outgoing = new ArrayList<>();
+ return messenger;
}
- public SparkMessenger(final Vertex vertex, final List<M> incomingMessages) {
- this.vertex = vertex;
- this.incoming = incomingMessages;
+ public static final <M> SparkMessenger<M> forMessageVertex(final Object vertexId, final M message) {
+ final SparkMessenger<M> messenger = new SparkMessenger<>();
+ messenger.vertex = new DetachedVertex(vertexId, Vertex.DEFAULT_LABEL, Collections.emptyMap());
+ messenger.incoming = new ArrayList<>();
+ messenger.incoming.add(message);
+ return messenger;
}
public void clearIncomingMessages() {
@@ -69,7 +72,8 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
}
public void clearOutgoingMessages() {
- this.outgoing.clear();
+ if (null != this.outgoing)
+ this.outgoing.clear();
}
public Vertex getVertex() {
@@ -92,45 +96,24 @@ public class SparkMessenger<M> implements Serializable, Messenger<M> {
}
}
- public Set<Map.Entry<Object, List<M>>> getOutgoingMessages() {
- return this.outgoing.entrySet();
+ public List<Tuple2<Object, M>> getOutgoingMessages() {
+ return this.outgoing;
}
@Override
public Iterable<M> receiveMessages(final MessageScope messageScope) {
- if (null == this.outgoing)
- throw new IllegalStateException("Message vertices can not receive messages");
-
- return null == this.incoming ? Collections.emptyList() : this.incoming;
+ return this.incoming;
}
@Override
public void sendMessage(final MessageScope messageScope, final M message) {
- if (null == this.outgoing)
- throw new IllegalStateException("Message vertices can not send messages");
-
if (messageScope instanceof MessageScope.Local) {
final MessageScope.Local<M> localMessageScope = (MessageScope.Local) messageScope;
final Traversal.Admin<Vertex, Edge> incidentTraversal = SparkMessenger.setVertexStart(localMessageScope.getIncidentTraversal().get(), this.vertex);
final Direction direction = SparkMessenger.getOppositeDirection(incidentTraversal);
- incidentTraversal.forEachRemaining(edge -> {
- final Object otherVertexId = edge.iterators().vertexIterator(direction).next().id();
- List<M> messages = this.outgoing.get(otherVertexId);
- if (null == messages) {
- messages = new ArrayList<>();
- this.outgoing.put(otherVertexId, messages);
- }
- messages.add(message);
- });
+ incidentTraversal.forEachRemaining(edge -> this.outgoing.add(new Tuple2<>(edge.iterators().vertexIterator(direction).next().id(), message)));
} else {
- ((MessageScope.Global) messageScope).vertices().forEach(v -> {
- List<M> messages = this.outgoing.get(v.id());
- if (null == messages) {
- messages = new ArrayList<>();
- this.outgoing.put(v.id(), messages);
- }
- messages.add(message);
- });
+ ((MessageScope.Global) messageScope).vertices().forEach(v -> this.outgoing.add(new Tuple2<>(v.id(), message)));
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/fca86b9e/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
index a974f63..0c10fbd 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/util/SparkHelper.java
@@ -38,16 +38,11 @@ import org.apache.tinkerpop.gremlin.process.computer.MapReduce;
import org.apache.tinkerpop.gremlin.process.computer.Memory;
import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.process.computer.VertexProgram;
-import org.apache.tinkerpop.gremlin.structure.Vertex;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertex;
import org.apache.tinkerpop.gremlin.util.iterator.IteratorUtils;
import scala.Tuple2;
import java.io.IOException;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
import java.util.Optional;
/**
@@ -61,7 +56,7 @@ public final class SparkHelper {
public static <M> JavaPairRDD<Object, SparkMessenger<M>> executeStep(final JavaPairRDD<Object, SparkMessenger<M>> graphRDD, final VertexProgram<M> globalVertexProgram, final SparkMemory memory, final Configuration apacheConfiguration) {
JavaPairRDD<Object, SparkMessenger<M>> current = graphRDD;
// execute vertex program
- current = current.mapPartitionsToPair(partitionIterator -> { // each partition(Spark)/worker(TP3) has a local copy of the vertex program
+ current = current.mapPartitionsToPair(partitionIterator -> { // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration);
workerVertexProgram.workerIterationStart(memory);
return () -> IteratorUtils.<Tuple2<Object, SparkMessenger<M>>, Tuple2<Object, SparkMessenger<M>>>map(partitionIterator, keyValue -> {
@@ -71,27 +66,13 @@ public final class SparkHelper {
});
});
- // emit messages by appending them to the graph vertices data as "message vertices"
- current = current.<Object, SparkMessenger<M>>flatMapToPair(keyValue -> () -> new Iterator<Tuple2<Object, SparkMessenger<M>>>() {
- boolean first = true;
- final Iterator<Map.Entry<Object, List<M>>> iterator = keyValue._2().getOutgoingMessages().iterator();
-
- @Override
- public boolean hasNext() {
- return this.first || this.iterator.hasNext();
- }
-
- @Override
- public Tuple2<Object, SparkMessenger<M>> next() {
- if (this.first) {
- this.first = false;
- keyValue._2().clearIncomingMessages(); // the raw vertex should not have any incoming messages (should be cleared from the previous stage)
- return new Tuple2<Object, SparkMessenger<M>>(keyValue._1(), keyValue._2()); // this is the raw vertex data
- } else {
- final Map.Entry<Object, List<M>> entry = this.iterator.next();
- return new Tuple2<Object, SparkMessenger<M>>(entry.getKey(), new SparkMessenger<>(new DetachedVertex(entry.getKey(), Vertex.DEFAULT_LABEL, Collections.emptyMap()), entry.getValue())); // these are the messages
- }
- }
+ // emit messages by appending them to the graph vertices as message "vertices"
+ current = current.<Object, SparkMessenger<M>>flatMapToPair(keyValue -> () -> {
+ keyValue._2().clearIncomingMessages(); // the graph vertex should not have any incoming messages (should be cleared from the previous stage)
+ return IteratorUtils.<Tuple2<Object, SparkMessenger<M>>>concat(
+ IteratorUtils.of(keyValue),
+ IteratorUtils.map(keyValue._2().getOutgoingMessages().iterator(), // this is the graph vertex
+ entry -> new Tuple2<>(entry._1(), SparkMessenger.forMessageVertex(entry._2(), entry._2())))); // this is a message "vertex";
});
// "message pass" via reduction joining the "message vertices" with the graph vertices
@@ -109,11 +90,12 @@ public final class SparkHelper {
return messengerA; // always reduce on the first argument
});
- // clear all previous outgoing messages
+ // clear all previous outgoing messages (why can't we do this prior to the shuffle?)
current = current.mapValues(messenger -> {
messenger.clearOutgoingMessages();
return messenger;
});
+
return current;
}