You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/25 21:10:08 UTC
incubator-tinkerpop git commit: ran numerous SparkServer jobs for
manual testing for 3.1.1 release. Found a few issues. 1) we don't repartition
if the workers is greater than the number of partitions. FIXED. 2) there was
a memory leak in SparkExecutor th
Repository: incubator-tinkerpop
Updated Branches:
refs/heads/master 65e777577 -> 2ca4cba87
ran numerous SparkServer jobs for manual testing for 3.1.1 release. Found a few issues. 1) we don't repartition if the workers is greater than the number of partitions. FIXED. 2) there was a memory leak in SparkExecutor that was easily solved with a .clear() to the incomming view. I ran full integration tests for Spark and all is golden. CTR.
Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/2ca4cba8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/2ca4cba8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/2ca4cba8
Branch: refs/heads/master
Commit: 2ca4cba87d98101b404340ccd80bc44f7a417f9e
Parents: 65e7775
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Jan 25 13:10:00 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Jan 25 13:10:00 2016 -0700
----------------------------------------------------------------------
.../traversal/engine/ComputerTraversalEngine.java | 14 +++++++++++---
.../gremlin/structure/util/star/StarGraph.java | 5 ++++-
.../gremlin/spark/process/computer/SparkExecutor.java | 3 +++
.../spark/process/computer/SparkGraphComputer.java | 11 +++++++----
4 files changed, 25 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/2ca4cba8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/engine/ComputerTraversalEngine.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/engine/ComputerTraversalEngine.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/engine/ComputerTraversalEngine.java
index 1f2bc7c..8461791 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/engine/ComputerTraversalEngine.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/engine/ComputerTraversalEngine.java
@@ -68,13 +68,20 @@ public final class ComputerTraversalEngine implements TraversalEngine {
public final static class Builder implements TraversalEngine.Builder {
private Class<? extends GraphComputer> graphComputerClass;
+ private int workers = -1;
private static final List<TraversalStrategy> WITH_STRATEGIES = Collections.singletonList(ComputerResultStrategy.instance());
+
@Override
public List<TraversalStrategy> getWithStrategies() {
return WITH_STRATEGIES;
}
+ public Builder workers(final int workers) {
+ this.workers = workers;
+ return this;
+ }
+
public Builder computer(final Class<? extends GraphComputer> graphComputerClass) {
this.graphComputerClass = graphComputerClass;
return this;
@@ -82,9 +89,10 @@ public final class ComputerTraversalEngine implements TraversalEngine {
public ComputerTraversalEngine create(final Graph graph) {
- return null == this.graphComputerClass ?
- new ComputerTraversalEngine(graph.compute()) :
- new ComputerTraversalEngine(graph.compute(this.graphComputerClass));
+ final GraphComputer graphComputer = null == this.graphComputerClass ? graph.compute() : graph.compute(this.graphComputerClass);
+ if (-1 != this.workers)
+ graphComputer.workers(this.workers);
+ return new ComputerTraversalEngine(graphComputer);
}
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/2ca4cba8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
index aa45413..c613dbb 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
@@ -197,7 +197,8 @@ public final class StarGraph implements Graph, Serializable {
vertex.properties().forEachRemaining(vp -> {
final VertexProperty<?> starVertexProperty = starVertex.property(VertexProperty.Cardinality.list, vp.key(), vp.value(), T.id, vp.id());
- if (supportsMetaProperties) vp.properties().forEachRemaining(p -> starVertexProperty.property(p.key(), p.value()));
+ if (supportsMetaProperties)
+ vp.properties().forEachRemaining(p -> starVertexProperty.property(p.key(), p.value()));
});
vertex.edges(Direction.IN).forEachRemaining(edge -> {
final Edge starEdge = starVertex.addInEdge(edge.label(), starGraph.addVertex(T.id, edge.outVertex().id()), T.id, edge.id());
@@ -273,6 +274,8 @@ public final class StarGraph implements Graph, Serializable {
}
public void dropEdges() {
+ if (null != this.outEdges) this.outEdges.clear();
+ if (null != this.inEdges) this.inEdges.clear();
this.outEdges = null;
this.inEdges = null;
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/2ca4cba8/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 6dd109a..087db62 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -84,9 +84,11 @@ public final class SparkExecutor {
final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : Collections.emptyList();
final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
previousView.forEach(property -> property.attach(Attachable.Method.create(vertex))); // attach the view to the vertex
+ previousView.clear(); // no longer needed so kill it from memory
///
messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
workerVertexProgram.execute(ComputerGraph.vertexProgram(vertex, workerVertexProgram), messenger, memory); // execute the vertex program on this vertex for this iteration
+ incomingMessages.clear(); // no longer needed so kill it from memory
///
final List<DetachedVertexProperty<Object>> nextView = elementComputeKeysArray.length == 0 ? // not all vertex programs have compute keys
Collections.emptyList() :
@@ -139,6 +141,7 @@ public final class SparkExecutor {
vertex.dropVertexProperties(elementComputeKeys);
final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
view.forEach(property -> property.attach(Attachable.Method.create(vertex)));
+ view.clear(); // no longer needed so kill it from memory
return tuple._1();
});
}
http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/2ca4cba8/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 4ec3333..e682c72 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -166,8 +166,12 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
.newInstance()
.readGraphRDD(apacheConfiguration, sparkContext);
- if (this.workersSet && graphRDD.partitions().size() > this.workers) // ensures that the graphRDD does not have more partitions than workers
- graphRDD = graphRDD.coalesce(this.workers);
+ if (this.workersSet) {
+ if (graphRDD.partitions().size() > this.workers) // ensures that the graphRDD does not have more partitions than workers
+ graphRDD = graphRDD.coalesce(this.workers);
+ else if (graphRDD.partitions().size() < this.workers) // ensures that the graphRDD does not have less partitions than workers
+ graphRDD = graphRDD.repartition(this.workers);
+ }
// persist the vertex program loaded graph as specified by configuration or else use default cache() which is MEMORY_ONLY
if (!inputFromSpark)
graphRDD = graphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
@@ -226,7 +230,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = graphRDD.mapValues(vertexWritable -> {
vertexWritable.get().dropEdges();
return vertexWritable;
- }).cache(); // drop all the edges of the graph as they are not used in mapReduce processing
+ }); // drop all the edges of the graph as they are not used in mapReduce processing
for (final MapReduce mapReduce : this.mapReducers) {
// execute the map reduce job
final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
@@ -246,7 +250,6 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
throw new IllegalStateException(e.getMessage(), e);
}
}
- mapReduceGraphRDD.unpersist();
}
// unpersist the graphRDD if it will no longer be used