You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/01/25 21:10:08 UTC

incubator-tinkerpop git commit: ran numerous SparkServer jobs for manual testing for 3.1.1 release. Found a few issues. 1) we don't repartition if the workers is greater than the number of partitions. FIXED. 2) there was a memory leak in SparkExecutor th

Repository: incubator-tinkerpop
Updated Branches:
  refs/heads/master 65e777577 -> 2ca4cba87


ran numerous SparkServer jobs for manual testing for 3.1.1 release. Found a few issues. 1) we don't repartition if the workers is greater than the number of partitions. FIXED. 2) there was a memory leak in SparkExecutor that was easily solved with a .clear() to the incomming view. I ran full integration tests for Spark and all is golden. CTR.


Project: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/commit/2ca4cba8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/tree/2ca4cba8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/diff/2ca4cba8

Branch: refs/heads/master
Commit: 2ca4cba87d98101b404340ccd80bc44f7a417f9e
Parents: 65e7775
Author: Marko A. Rodriguez <ok...@gmail.com>
Authored: Mon Jan 25 13:10:00 2016 -0700
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Mon Jan 25 13:10:00 2016 -0700

----------------------------------------------------------------------
 .../traversal/engine/ComputerTraversalEngine.java     | 14 +++++++++++---
 .../gremlin/structure/util/star/StarGraph.java        |  5 ++++-
 .../gremlin/spark/process/computer/SparkExecutor.java |  3 +++
 .../spark/process/computer/SparkGraphComputer.java    | 11 +++++++----
 4 files changed, 25 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/2ca4cba8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/engine/ComputerTraversalEngine.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/engine/ComputerTraversalEngine.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/engine/ComputerTraversalEngine.java
index 1f2bc7c..8461791 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/engine/ComputerTraversalEngine.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/process/traversal/engine/ComputerTraversalEngine.java
@@ -68,13 +68,20 @@ public final class ComputerTraversalEngine implements TraversalEngine {
     public final static class Builder implements TraversalEngine.Builder {
 
         private Class<? extends GraphComputer> graphComputerClass;
+        private int workers = -1;
         private static final List<TraversalStrategy> WITH_STRATEGIES = Collections.singletonList(ComputerResultStrategy.instance());
 
+
         @Override
         public List<TraversalStrategy> getWithStrategies() {
             return WITH_STRATEGIES;
         }
 
+        public Builder workers(final int workers) {
+            this.workers = workers;
+            return this;
+        }
+
         public Builder computer(final Class<? extends GraphComputer> graphComputerClass) {
             this.graphComputerClass = graphComputerClass;
             return this;
@@ -82,9 +89,10 @@ public final class ComputerTraversalEngine implements TraversalEngine {
 
 
         public ComputerTraversalEngine create(final Graph graph) {
-            return null == this.graphComputerClass ?
-                    new ComputerTraversalEngine(graph.compute()) :
-                    new ComputerTraversalEngine(graph.compute(this.graphComputerClass));
+            final GraphComputer graphComputer = null == this.graphComputerClass ? graph.compute() : graph.compute(this.graphComputerClass);
+            if (-1 != this.workers)
+                graphComputer.workers(this.workers);
+            return new ComputerTraversalEngine(graphComputer);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/2ca4cba8/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
----------------------------------------------------------------------
diff --git a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
index aa45413..c613dbb 100644
--- a/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
+++ b/gremlin-core/src/main/java/org/apache/tinkerpop/gremlin/structure/util/star/StarGraph.java
@@ -197,7 +197,8 @@ public final class StarGraph implements Graph, Serializable {
 
         vertex.properties().forEachRemaining(vp -> {
             final VertexProperty<?> starVertexProperty = starVertex.property(VertexProperty.Cardinality.list, vp.key(), vp.value(), T.id, vp.id());
-            if (supportsMetaProperties) vp.properties().forEachRemaining(p -> starVertexProperty.property(p.key(), p.value()));
+            if (supportsMetaProperties)
+                vp.properties().forEachRemaining(p -> starVertexProperty.property(p.key(), p.value()));
         });
         vertex.edges(Direction.IN).forEachRemaining(edge -> {
             final Edge starEdge = starVertex.addInEdge(edge.label(), starGraph.addVertex(T.id, edge.outVertex().id()), T.id, edge.id());
@@ -273,6 +274,8 @@ public final class StarGraph implements Graph, Serializable {
         }
 
         public void dropEdges() {
+            if (null != this.outEdges) this.outEdges.clear();
+            if (null != this.inEdges) this.inEdges.clear();
             this.outEdges = null;
             this.inEdges = null;
         }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/2ca4cba8/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 6dd109a..087db62 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -84,9 +84,11 @@ public final class SparkExecutor {
                         final List<DetachedVertexProperty<Object>> previousView = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getView() : Collections.emptyList();
                         final List<M> incomingMessages = hasViewAndMessages ? vertexViewIncoming._2()._2().get().getIncomingMessages() : Collections.emptyList();
                         previousView.forEach(property -> property.attach(Attachable.Method.create(vertex)));  // attach the view to the vertex
+                        previousView.clear(); // no longer needed so kill it from memory
                         ///
                         messenger.setVertexAndIncomingMessages(vertex, incomingMessages); // set the messenger with the incoming messages
                         workerVertexProgram.execute(ComputerGraph.vertexProgram(vertex, workerVertexProgram), messenger, memory); // execute the vertex program on this vertex for this iteration
+                        incomingMessages.clear(); // no longer needed so kill it from memory
                         ///
                         final List<DetachedVertexProperty<Object>> nextView = elementComputeKeysArray.length == 0 ?  // not all vertex programs have compute keys
                                 Collections.emptyList() :
@@ -139,6 +141,7 @@ public final class SparkExecutor {
                     vertex.dropVertexProperties(elementComputeKeys);
                     final List<DetachedVertexProperty<Object>> view = tuple._2().isPresent() ? tuple._2().get().getView() : Collections.emptyList();
                     view.forEach(property -> property.attach(Attachable.Method.create(vertex)));
+                    view.clear(); // no longer needed so kill it from memory
                     return tuple._1();
                 });
     }

http://git-wip-us.apache.org/repos/asf/incubator-tinkerpop/blob/2ca4cba8/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
index 4ec3333..e682c72 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkGraphComputer.java
@@ -166,8 +166,12 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                     graphRDD = hadoopConfiguration.getClass(Constants.GREMLIN_SPARK_GRAPH_INPUT_RDD, InputFormatRDD.class, InputRDD.class)
                             .newInstance()
                             .readGraphRDD(apacheConfiguration, sparkContext);
-                    if (this.workersSet && graphRDD.partitions().size() > this.workers) // ensures that the graphRDD does not have more partitions than workers
-                        graphRDD = graphRDD.coalesce(this.workers);
+                    if (this.workersSet) {
+                        if (graphRDD.partitions().size() > this.workers) // ensures that the graphRDD does not have more partitions than workers
+                            graphRDD = graphRDD.coalesce(this.workers);
+                        else if (graphRDD.partitions().size() < this.workers) // ensures that the graphRDD does not have less partitions than workers
+                            graphRDD = graphRDD.repartition(this.workers);
+                    }
                     // persist the vertex program loaded graph as specified by configuration or else use default cache() which is MEMORY_ONLY
                     if (!inputFromSpark)
                         graphRDD = graphRDD.persist(StorageLevel.fromString(hadoopConfiguration.get(Constants.GREMLIN_SPARK_GRAPH_STORAGE_LEVEL, "MEMORY_ONLY")));
@@ -226,7 +230,7 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                     final JavaPairRDD<Object, VertexWritable> mapReduceGraphRDD = graphRDD.mapValues(vertexWritable -> {
                         vertexWritable.get().dropEdges();
                         return vertexWritable;
-                    }).cache();  // drop all the edges of the graph as they are not used in mapReduce processing
+                    });  // drop all the edges of the graph as they are not used in mapReduce processing
                     for (final MapReduce mapReduce : this.mapReducers) {
                         // execute the map reduce job
                         final HadoopConfiguration newApacheConfiguration = new HadoopConfiguration(apacheConfiguration);
@@ -246,7 +250,6 @@ public final class SparkGraphComputer extends AbstractHadoopGraphComputer {
                             throw new IllegalStateException(e.getMessage(), e);
                         }
                     }
-                    mapReduceGraphRDD.unpersist();
                 }
 
                 // unpersist the graphRDD if it will no longer be used