You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/02 19:29:43 UTC

[3/3] incubator-tinkerpop git commit: a few object creation optimizations.

a few object creation optimizations.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/4ef87868
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/4ef87868
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/4ef87868

Branch: refs/heads/master
Commit: 4ef8786867daf0a0618387185d85d64302ef860d
Parents: 4155907
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Apr 2 11:22:14 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Apr 2 11:29:38 2015 -0600

----------------------------------------------------------------------
 .../process/computer/spark/SparkExecutor.java      | 17 +++++++----------
 .../process/computer/spark/SparkGraphComputer.java | 11 +++++------
 .../spark/payload/ViewIncomingPayload.java         | 14 ++++++++++----
 3 files changed, 22 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ef87868/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 4246f42..512c0e0 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -74,7 +74,7 @@ public final class SparkExecutor {
 
         final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = ((null == viewIncomingRDD) ?
                 graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
-                graphRDD.leftOuterJoin(viewIncomingRDD))                                                                                                    // every other iteration may have views and messages
+                graphRDD.leftOuterJoin(viewIncomingRDD))                                                                        // every other iteration may have views and messages
                 // for each partition of vertices
                 .mapPartitionsToPair(partitionIterator -> {
                     final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
@@ -103,7 +103,7 @@ public final class SparkExecutor {
 
         // "message pass" by reducing on the vertex object id of the message payloads
         final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
-        final JavaPairRDD<Object, Payload> newViewIncomingRDD = viewOutgoingRDD
+        final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD = viewOutgoingRDD
                 .flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
                         IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),
                         IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))
@@ -125,7 +125,7 @@ public final class SparkExecutor {
                             throw new IllegalStateException("It should never be the case that two views reduce to the same key");
                         return b;
                     } else {
-                        final ViewIncomingPayload<M> c = new ViewIncomingPayload<>();
+                        final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
                         if (a instanceof MessagePayload)
                             c.addIncomingMessage(((MessagePayload<M>) a).getMessage(), messageCombiner);
                         else if (a instanceof ViewPayload)
@@ -139,20 +139,17 @@ public final class SparkExecutor {
                 })
                 .mapValues(payload -> {
                     if (payload instanceof ViewIncomingPayload)
-                        return payload;
-                    else {
+                        return (ViewIncomingPayload<M>) payload;
+                    else {  // this means the vertex has no incoming messages
                         final ViewIncomingPayload<M> viewIncomingPayload = new ViewIncomingPayload<>();
-                        if (payload instanceof ViewPayload)
-                            viewIncomingPayload.setView(((ViewPayload) payload).getView());
-                        else
-                            throw new IllegalStateException("It should never be the case that a view is not emitted");
+                        viewIncomingPayload.setView(((ViewPayload) payload).getView());
                         return viewIncomingPayload;
                     }
                 });
 
         newViewIncomingRDD.foreachPartition(partitionIterator -> {
         }); // need to complete a task so its BSP and the memory for this iteration is updated
-        return (JavaPairRDD) newViewIncomingRDD;
+        return newViewIncomingRDD;
     }
 
     /////////////////

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ef87868/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 8c63f8a..1df99d3 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -44,14 +44,12 @@ import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
 import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
 import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
 import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
 import java.io.File;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
@@ -164,8 +162,8 @@ public final class SparkGraphComputer implements GraphComputer {
                                 (Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
                                 NullWritable.class,
                                 VertexWritable.class)
-                                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get()))) // TODO: use DetachedVertex?
-                                .reduceByKey((a, b) -> a) // TODO: test without doing this reduce
+                                .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
+                                .reduceByKey((a, b) -> a) // TODO: why is this necessary?
                                 .cache(); // partition the graph across the cluster
                         JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
 
@@ -207,9 +205,10 @@ public final class SparkGraphComputer implements GraphComputer {
                         if (!this.mapReducers.isEmpty()) {
                             // drop all edges and messages in the graphRDD as they are no longer needed for the map reduce jobs
                             final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD).cache();
-                            // graphRDD.unpersist(); // the original graphRDD is no longer needed so free up its memory
-
+                            // TODO: boolean first = true;
                             for (final MapReduce mapReduce : this.mapReducers) {
+                                // TODO: if (first) first = false;
+                                // TODO: else graphRDD.unpersist();  // the original graphRDD is no longer needed so free up its memory
                                 // execute the map reduce job
                                 final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
                                 mapReduce.storeState(newApacheConfiguration);

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ef87868/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
index be91b2e..2de9736 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
@@ -25,6 +25,7 @@ import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
 import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.List;
 
 /**
@@ -33,13 +34,18 @@ import java.util.List;
 public class ViewIncomingPayload<M> implements Payload {
 
     private List<DetachedVertexProperty<Object>> view = null;
-    private final List<M> incomingMessages = new ArrayList<>();
+    private final List<M> incomingMessages;
 
-    public ViewIncomingPayload() {
+    public ViewIncomingPayload(final MessageCombiner<M> messageCombiner) {
+        this.incomingMessages = null == messageCombiner ? new ArrayList<>() : new ArrayList<>(1);
+    }
+
+    public ViewIncomingPayload()  {
+        this.incomingMessages = null;
     }
 
     public List<DetachedVertexProperty<Object>> getView() {
-        return this.view;
+        return null == this.view ? Collections.emptyList() : this.view;
     }
 
     public void setView(final List<DetachedVertexProperty<Object>> view) {
@@ -47,7 +53,7 @@ public class ViewIncomingPayload<M> implements Payload {
     }
 
     public List<M> getIncomingMessages() {
-        return incomingMessages;
+        return null == this.incomingMessages ? Collections.emptyList() : this.incomingMessages;
     }
 
     public void addIncomingMessage(final M message, final MessageCombiner<M> messageCombiner) {