You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/10 01:21:11 UTC

incubator-tinkerpop git commit: minimized object creation in StarGraph.addTo(). Minor reduction in object creation in SparkExecutor.

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 302f01294 -> ec99f516e


minimized object creation in StarGraph.addTo(). Minor reduction in object creation in SparkExecutor.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/ec99f516
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/ec99f516
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/ec99f516

Branch: refs/heads/master
Commit: ec99f516ea1fa838c240e5ea11efaf7a0e2a4644
Parents: 302f012
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Apr 9 17:21:25 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Apr 9 17:21:25 2015 -0600

----------------------------------------------------------------------
 .../gremlin/structure/util/star/StarGraph.java  | 26 ++++++--------------
 .../process/computer/spark/SparkExecutor.java   |  7 +++---
 .../computer/spark/SparkGraphComputer.java      |  4 +--
 .../process/computer/spark/SparkMessenger.java  |  3 ++-
 4 files changed, 15 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec99f516/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
index 335bd16..654332a 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
@@ -164,32 +164,20 @@ public final class StarGraph implements Graph {
     public static Vertex addTo(final StarGraph graph, final DetachedVertex detachedVertex) {
         if (null != graph.starVertex)
             return null;
-
         graph.addVertex(T.id, detachedVertex.id(), T.label, detachedVertex.label());
         detachedVertex.properties().forEachRemaining(detachedVertexProperty -> {
-            final List<Object> keyValues = new ArrayList<>();
-            keyValues.add(T.id);
-            keyValues.add(detachedVertexProperty.id());
-            detachedVertexProperty.properties().forEachRemaining(detachedVertexPropertyProperty -> {
-                keyValues.add(detachedVertexPropertyProperty.key());
-                keyValues.add(detachedVertexPropertyProperty.value());
-            });
-            graph.starVertex.property(VertexProperty.Cardinality.list, detachedVertexProperty.key(), detachedVertexProperty.value(), keyValues.toArray());
+            final VertexProperty<?> starVertexProperty = graph.starVertex.property(VertexProperty.Cardinality.list, detachedVertexProperty.key(), detachedVertexProperty.value(), T.id, detachedVertexProperty.id());
+            detachedVertexProperty.properties().forEachRemaining(detachedVertexPropertyProperty -> starVertexProperty.property(detachedVertexPropertyProperty.key(), detachedVertexPropertyProperty.value()));
         });
         return graph.starVertex;
     }
 
     public static Edge addTo(final StarGraph graph, final DetachedEdge edge) {
-        final List<Object> keyValues = new ArrayList<>();
-        keyValues.add(T.id);
-        keyValues.add(edge.id());
-        edge.properties().forEachRemaining(property -> {
-            keyValues.add(property.key());
-            keyValues.add(property.value());
-        });
-        return !graph.starVertex.id().equals(edge.inVertex().id()) ?
-                graph.starVertex.addOutEdge(edge.label(), edge.inVertex(), keyValues.toArray()) :
-                graph.starVertex.addInEdge(edge.label(), edge.outVertex(), keyValues.toArray());
+        final Edge starEdge = !graph.starVertex.id().equals(edge.inVertex().id()) ?
+                graph.starVertex.addOutEdge(edge.label(), edge.inVertex(), T.id, edge.id()) :
+                graph.starVertex.addInEdge(edge.label(), edge.outVertex(), T.id, edge.id());
+        edge.properties().forEachRemaining(property -> starEdge.property(property.key(), property.value()));
+        return starEdge;
     }
 
     protected Long generateId() {

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec99f516/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index af5a9ff..f493c23 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -79,6 +79,7 @@ public final class SparkExecutor {
                     final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
                     final Set<String> elementComputeKeys = workerVertexProgram.getElementComputeKeys(); // the compute keys as a set
                     final String[] elementComputeKeysArray = elementComputeKeys.size() == 0 ? null : elementComputeKeys.toArray(new String[elementComputeKeys.size()]); // the compute keys as an array
+                    final SparkMessenger<M> messenger = new SparkMessenger<>();
                     workerVertexProgram.workerIterationStart(memory); // start the worker
                     return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
                         final Vertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable
@@ -87,7 +88,7 @@ public final class SparkExecutor {
                         final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
                         previousView.forEach(property -> DetachedVertexProperty.addTo(vertex, property));  // attach the view to the vertex
                         ///
-                        final SparkMessenger<M> messenger = new SparkMessenger<>(vertex, incomingMessages); // create the messenger with the incoming messages
+                        messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
                         workerVertexProgram.execute(ComputerGraph.of(vertex, elementComputeKeys), messenger, memory); // execute the vertex program on this vertex for this iteration
                         ///
                         final List<DetachedVertexProperty<Object>> nextView = null == elementComputeKeysArray ?  // not all vertex programs have compute keys
@@ -98,7 +99,7 @@ public final class SparkExecutor {
                             workerVertexProgram.workerIterationEnd(memory); // if no more vertices in the partition, end the worker's iteration
                         return new Tuple2<>(vertex.id(), new ViewOutgoingPayload<>(nextView, outgoingMessages));
                     });
-                })).setName("viewOutgoingRDD");
+                }));
 
         // "message pass" by reducing on the vertex object id of the view and message payloads
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
@@ -126,7 +127,7 @@ public final class SparkExecutor {
                         (ViewIncomingPayload<M>) payload :                    // this happens if there is a vertex with incoming messages
                         new ViewIncomingPayload<>((ViewPayload) payload));    // this happens if there is a vertex with no incoming messages
 
-        newViewIncomingRDD.setName("viewIncomingRDD")
+        newViewIncomingRDD
                 .foreachPartition(partitionIterator -> {
                 }); // need to complete a task so its BSP and the memory for this iteration is updated
         return newViewIncomingRDD;

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec99f516/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 4b063bb..0c979a7 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -211,10 +211,10 @@ public final class SparkGraphComputer implements GraphComputer {
                                 final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
                                 mapReduce.storeState(newApacheConfiguration);
                                 // map
-                                final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration).setName("mapRDD");
+                                final JavaPairRDD mapRDD = SparkExecutor.executeMap((JavaPairRDD) mapReduceGraphRDD, mapReduce, newApacheConfiguration);
                                 // combine TODO? is this really needed
                                 // reduce
-                                final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration).setName("reduceRDD") : null;
+                                final JavaPairRDD reduceRDD = (mapReduce.doStage(MapReduce.Stage.REDUCE)) ? SparkExecutor.executeReduce(mapRDD, mapReduce, newApacheConfiguration) : null;
                                 // write the map reduce output back to disk (memory)
                                 SparkExecutor.saveMapReduceRDD(null == reduceRDD ? mapRDD : reduceRDD, mapReduce, finalMemory, hadoopConfiguration);
                             }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/ec99f516/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
index 5d9223c..f3f8e33 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkMessenger.java
@@ -42,9 +42,10 @@ public final class SparkMessenger<M> implements Messenger<M> {
     private Iterable<M> incomingMessages;
     private final List<Tuple2<Object, M>> outgoingMessages = new ArrayList<>();
 
-    public SparkMessenger(final Vertex vertex, final Iterable<M> incomingMessages) {
+    public void setVertexAndIncomingMessages(final Vertex vertex, final Iterable<M> incomingMessages) {
         this.vertex = vertex;
         this.incomingMessages = incomingMessages;
+        this.outgoingMessages.clear();
     }
 
     public List<Tuple2<Object, M>> getOutgoingMessages() {