You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by ok...@apache.org on 2016/10/26 14:19:01 UTC

[29/29] tinkerpop git commit: Avoid starting VP worker iterations that never end

Avoid starting VP worker iterations that never end

SparkExecutor.executeVertexProgramIteration was written in such a way
that an empty RDD partition would cause it to invoke
VertexProgram.workerIterationStart without ever invoking
VertexProgram.workerIterationEnd.  This seems like a contract
violation.  I have at least one VP that relies on
workerIterationStart|End to allocate and release resources.  Failing
to invoke End like this causes a leak in that VP, as it would for any
VP that uses that resource management pattern.

(cherry picked from commit 36e1159a80f539b8bd4a884e5c1cf304ec52c4f9;
 this is the same change, except it tracks a switch between Spark 1.6
 and 2.0 away from functions that manipulate iterables to those that
 manipulate iterators)


Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/19e0f2fc
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/19e0f2fc
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/19e0f2fc

Branch: refs/heads/TINKERPOP-1389
Commit: 19e0f2fc1a4662364d06ff8b32095e32cce93bf3
Parents: 78936b1
Author: Dan LaRocque <da...@hopcount.org>
Authored: Tue Oct 25 19:37:17 2016 -0500
Committer: Marko A. Rodriguez <ok...@gmail.com>
Committed: Wed Oct 26 08:16:26 2016 -0600

----------------------------------------------------------------------
 .../gremlin/spark/process/computer/SparkExecutor.java          | 6 ++++++
 1 file changed, 6 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/19e0f2fc/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 8d32b36..e9372d0 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -91,9 +91,15 @@ public final class SparkExecutor {
                 // for each partition of vertices emit a view and their outgoing messages
                 .mapPartitionsToPair(partitionIterator -> {
                     KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
+
+                    // if the partition is empty, return without starting a new VP iteration
+                    if (!partitionIterator.hasNext())
+                        return Collections.emptyIterator();
+
                     final VertexProgram<M> workerVertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(graphComputerConfiguration), vertexProgramConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
                     final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array
                     final SparkMessenger<M> messenger = new SparkMessenger<>();
+
                     workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
                     return IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
                         final StarGraph.StarVertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable