You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/10 01:21:11 UTC
incubator-tinkerpop git commit: minimized object creation in
StarGraph.addTo(). Minor reduction in object creation in SparkExecutor.
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 302f01294 -> ec99f516e
minimized object creation in StarGraph.addTo(). Minor reduction in object creation in SparkExecutor.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/ec99f516
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/ec99f516
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/ec99f516
Branch: refs/heads/master
Commit: ec99f516ea1fa838c240e5ea11efaf7a0e2a4644
Parents: 302f012
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Apr 9 17:21:25 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Apr 9 17:21:25 2015 -0600
----------------------------------------------------------------------
.../gremlin/structure/util/star/StarGraph.java | 26 ++++++--------------
.../process/computer/spark/SparkExecutor.java | 7 +++---
.../computer/spark/SparkGraphComputer.java | 4 +--
.../process/computer/spark/SparkMessenger.java | 3 ++-
4 files changed, 15 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec99f516/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
index 335bd16..654332a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
@@ -164,32 +164,20 @@ public final class StarGraph implements Graph {
public static Vertex addTo(final StarGraph graph, final DetachedVertex detachedVertex) {
if (null != graph.starVertex)
return null;
-
graph.addVertex(T.id, detachedVertex.id(), T.label, detachedVertex.label());
detachedVertex.properties().forEachRemaining(detachedVertexProperty -> {
- final List<Object> keyValues = new ArrayList<>();
- keyValues.add(T.id);
- keyValues.add(detachedVertexProperty.id());
- detachedVertexProperty.properties().forEachRemaining(detachedVertexPropertyProperty -> {
- keyValues.add(detachedVertexPropertyProperty.key());
- keyValues.add(detachedVertexPropertyProperty.value());
- });
- graph.starVertex.property(VertexProperty.Cardinality.list, detachedVertexProperty.key(), detachedVertexProperty.value(), keyValues.toArray());
+ final VertexProperty<?> starVertexProperty = graph.starVertex.property(VertexProperty.Cardinality.list, detachedVertexProperty.key(), detachedVertexProperty.value(), T.id, detachedVertexProperty.id());
+ detachedVertexProperty.properties().forEachRemaining(detachedVertexPropertyProperty -> starVertexProperty.property(detachedVertexPropertyProperty.key(), detachedVertexPropertyProperty.value()));
});
return graph.starVertex;
}
public static Edge addTo(final StarGraph graph, final DetachedEdge edge) {
- final List<Object> keyValues = new ArrayList<>();
- keyValues.add(T.id);
- keyValues.add(edge.id());
- edge.properties().forEachRemaining(property -> {
- keyValues.add(property.key());
- keyValues.add(property.value());
- });
- return !graph.starVertex.id().equals(edge.inVertex().id()) ?
- graph.starVertex.addOutEdge(edge.label(), edge.inVertex(), keyValues.toArray()) :
- graph.starVertex.addInEdge(edge.label(), edge.outVertex(), keyValues.toArray());
+ final Edge starEdge = !graph.starVertex.id().equals(edge.inVertex().id()) ?
+ graph.starVertex.addOutEdge(edge.label(), edge.inVertex(), T.id, edge.id()) :
+ graph.starVertex.addInEdge(edge.label(), edge.outVertex(), T.id, edge.id());
+ edge.properties().forEachRemaining(property -> starEdge.property(property.key(), property.value()));
+ return starEdge;
}
protected Long generateId() {
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec99f516/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index af5a9ff..f493c23 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -79,6 +79,7 @@ public final class SparkExecutor {
final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
+ final SparkMessenger<M> messenger = new SparkMessenger<>();
workerVertexProgram.workerIterationStart(memory); // start the worker
return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
final Vertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable
@@ -87,7 +88,7 @@ public final class SparkExecutor {
final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
previousView.forEach(property -> DetachedVertexProperty.addTo(vertex, property)); // attach the view to the vertex
///
- final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, incomingMessages); // create the messenger with the incoming messages
+ messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory); // execute the vertex program on this vertex for this iteration
///
final List<DetachedVertexProperty<Object>> nextView = null == elementComputeKeysArray ? // not all vertex programs have compute keys
@@ -98,7 +99,7 @@ public final class SparkExecutor {
workerVertexProgram.workerIterationEnd(memory); // if no more vertices in the partition, end the worker's iteration
return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
});
- })).setName("viewOutgoingRDD");
+ }));
// "message pass" by reducing on the vertex object id of the view and message payloads
final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
@@ -126,7 +127,7 @@ public final class SparkExecutor {
(ViewIncomingPayload<M>) payload : // this happens if there is a vertex with incoming messages
new ViewIncomingPayload<>((ViewPayload) payload)); // this happens if there is a vertex with no incoming messages
- newViewIncomingRDD.setName("viewIncomingRDD")
+ newViewIncomingRDD
.foreachPartition(partitionIterator -> {
}); // need to complete a task so its BSP and the memory for this iteration is updated
return newViewIncomingRDD;
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec99f516/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 4b063bb..0c979a7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -211,10 +211,10 @@ public final class SparkGraphComputer implements GraphComputer {
final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
mapReduce.storeState(newApacheConfiguration);
// map
- final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration).setName("mapRDD");
+ final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration);
// combine TODO? is this really needed
// reduce
- final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration).setName("reduceRDD") : null;
+ final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
// write the map reduce output back to disk (memory)
SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec99f516/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
index 5d9223c..f3f8e33 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
@@ -42,9 +42,10 @@ public final class SparkMessenger<M> implements Messenger<M> {
private Iterable<M> incomingMessages;
private final List<Tuple2<Object, M>> outgoingMessages = new ArrayList<>();
- public SparkMessenger(final Vertex vertex, final Iterable<M> incomingMessages) {
+ public void setVertexAndIncomingMessages(final Vertex vertex, final Iterable<M> incomingMessages) {
this.vertex = vertex;
this.incomingMessages = incomingMessages;
+ this.outgoingMessages.clear();
}
public List<Tuple2<Object, M>> getOutgoingMessages() {