You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2015/04/02 19:29:43 UTC
[3/3] incubator-tinkerpop git commit: a few object creation
optimizations.
a few object creation optimizations.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/4ef87868
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/4ef87868
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/4ef87868
Branch: refs/heads/master
Commit: 4ef8786867daf0a0618387185d85d64302ef860d
Parents: 4155907
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Thu Apr 2 11:22:14 2015 -0600
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Thu Apr 2 11:29:38 2015 -0600
----------------------------------------------------------------------
.../process/computer/spark/SparkExecutor.java | 17 +++++++----------
.../process/computer/spark/SparkGraphComputer.java | 11 +++++------
.../spark/payload/ViewIncomingPayload.java | 14 ++++++++++----
3 files changed, 22 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ef87868/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
index 4246f42..512c0e0 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkExecutor.java
@@ -74,7 +74,7 @@ public final class SparkExecutor {
final JavaPairRDD<Object, ViewOutgoingPayload<M>> viewOutgoingRDD = ((null == viewIncomingRDD) ?
graphRDD.mapValues(vertexWritable -> new Tuple2<>(vertexWritable, Optional.<ViewIncomingPayload<M>>absent())) : // first iteration will not have any views or messages
- graphRDD.leftOuterJoin(viewIncomingRDD)) // every other iteration may have views and messages
+ graphRDD.leftOuterJoin(viewIncomingRDD)) // every other iteration may have views and messages
// for each partition of vertices
.mapPartitionsToPair(partitionIterator -> {
final VertexProgram<M> workerVertexProgram = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program to reduce object creation
@@ -103,7 +103,7 @@ public final class SparkExecutor {
// "message pass" by reducing on the vertex object id of the message payloads
final MessageCombiner<M> messageCombiner = VertexProgram.<VertexProgram<M>>createVertexProgram(apacheConfiguration).getMessageCombiner().orElse(null);
- final JavaPairRDD<Object, Payload> newViewIncomingRDD = viewOutgoingRDD
+ final JavaPairRDD<Object, ViewIncomingPayload<M>> newViewIncomingRDD = viewOutgoingRDD
.flatMapToPair(tuple -> () -> IteratorUtils.<Tuple2<Object, Payload>>concat(
IteratorUtils.of(new Tuple2<>(tuple._1(), tuple._2().getView())),
IteratorUtils.map(tuple._2().getOutgoingMessages().iterator(), message -> new Tuple2<>(message._1(), new MessagePayload<>(message._2())))))
@@ -125,7 +125,7 @@ public final class SparkExecutor {
throw new IllegalStateException("It should never be the case that two views reduce to the same key");
return b;
} else {
- final ViewIncomingPayload<M> c = new ViewIncomingPayload<>();
+ final ViewIncomingPayload<M> c = new ViewIncomingPayload<>(messageCombiner);
if (a instanceof MessagePayload)
c.addIncomingMessage(((MessagePayload<M>) a).getMessage(), messageCombiner);
else if (a instanceof ViewPayload)
@@ -139,20 +139,17 @@ public final class SparkExecutor {
})
.mapValues(payload -> {
if (payload instanceof ViewIncomingPayload)
- return payload;
- else {
+ return (ViewIncomingPayload<M>) payload;
+ else { // this means the vertex has no incoming messages
final ViewIncomingPayload<M> viewIncomingPayload = new ViewIncomingPayload<>();
- if (payload instanceof ViewPayload)
- viewIncomingPayload.setView(((ViewPayload) payload).getView());
- else
- throw new IllegalStateException("It should never be the case that a view is not emitted");
+ viewIncomingPayload.setView(((ViewPayload) payload).getView());
return viewIncomingPayload;
}
});
newViewIncomingRDD.foreachPartition(partitionIterator -> {
}); // need to complete a task so its BSP and the memory for this iteration is updated
- return (JavaPairRDD) newViewIncomingRDD;
+ return newViewIncomingRDD;
}
/////////////////
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ef87868/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
index 8c63f8a..1df99d3 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/SparkGraphComputer.java
@@ -44,14 +44,12 @@ import org.apache.tinkerpop.gremlin.process.computer.util.DefaultComputerResult;
import org.apache.tinkerpop.gremlin.process.computer.util.GraphComputerHelper;
import org.apache.tinkerpop.gremlin.process.computer.util.MapMemory;
import org.apache.tinkerpop.gremlin.structure.util.StringFactory;
-import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.Tuple2;
import java.io.File;
import java.util.HashSet;
-import java.util.List;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -164,8 +162,8 @@ public final class SparkGraphComputer implements GraphComputer {
(Class<InputFormat<NullWritable, VertexWritable>>) hadoopConfiguration.getClass(Constants.GREMLIN_HADOOP_GRAPH_INPUT_FORMAT, InputFormat.class),
NullWritable.class,
VertexWritable.class)
- .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get()))) // TODO: use DetachedVertex?
- .reduceByKey((a, b) -> a) // TODO: test without doing this reduce
+ .mapToPair(tuple -> new Tuple2<>(tuple._2().get().id(), new VertexWritable(tuple._2().get())))
+ .reduceByKey((a, b) -> a) // TODO: why is this necessary?
.cache(); // partition the graph across the cluster
JavaPairRDD<Object, ViewIncomingPayload<Object>> viewIncomingRDD = null;
@@ -207,9 +205,10 @@ public final class SparkGraphComputer implements GraphComputer {
if (!this.mapReducers.isEmpty()) {
// drop all edges and messages in the graphRDD as they are no longer needed for the map reduce jobs
final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = SparkExecutor.prepareGraphRDDForMapReduce(graphRDD, viewIncomingRDD).cache();
- // graphRDD.unpersist(); // the original graphRDD is no longer needed so free up its memory
-
+ // TODO: boolean first = true;
for (final MapReduce mapReduce : this.mapReducers) {
+ // TODO: if (first) first = false;
+ // TODO: else graphRDD.unpersist(); // the original graphRDD is no longer needed so free up its memory
// execute the map reduce job
final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
mapReduce.storeState(newApacheConfiguration);
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/4ef87868/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
----------------------------------------------------------------------
diff --git a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
index be91b2e..2de9736 100644
--- a/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
+++ b/hadoop-gremlin/src/main/java/org/apache/tinkerpop/gremlin/hadoop/process/computer/spark/payload/ViewIncomingPayload.java
@@ -25,6 +25,7 @@ import org.apache.tinkerpop.gremlin.process.computer.MessageCombiner;
import org.apache.tinkerpop.gremlin.structure.util.detached.DetachedVertexProperty;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
/**
@@ -33,13 +34,18 @@ import java.util.List;
public class ViewIncomingPayload<M> implements Payload {
private List<DetachedVertexProperty<Object>> view = null;
- private final List<M> incomingMessages = new ArrayList<>();
+ private final List<M> incomingMessages;
- public ViewIncomingPayload() {
+ public ViewIncomingPayload(final MessageCombiner<M> messageCombiner) {
+ this.incomingMessages = null == messageCombiner ? new ArrayList<>() : new ArrayList<>(1);
+ }
+
+ public ViewIncomingPayload() {
+ this.incomingMessages = null;
}
public List<DetachedVertexProperty<Object>> getView() {
- return this.view;
+ return null == this.view ? Collections.emptyList() : this.view;
}
public void setView(final List<DetachedVertexProperty<Object>> view) {
@@ -47,7 +53,7 @@ public class ViewIncomingPayload<M> implements Payload {
}
public List<M> getIncomingMessages() {
- return incomingMessages;
+ return null == this.incomingMessages ? Collections.emptyList() : this.incomingMessages;
}
public void addIncomingMessage(final M message, final MessageCombiner<M> messageCombiner) {