You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tinkerpop.apache.org by sp...@apache.org on 2016/10/31 17:22:06 UTC
[03/12] tinkerpop git commit: Avoid starting VP worker iterations
that never end
Avoid starting VP worker iterations that never end
SparkExecutor.executeVertexProgramIteration was written in such a way
that an empty RDD partition would cause it to invoke
VertexProgram.workerIterationStart without ever invoking
VertexProgram.workerIterationEnd. This seems like a contract
violation. I have at least one VP that relies on
workerIterationStart|End to allocate and release resources. Failing
to invoke End like this causes a leak in that VP, as it would for any
VP that uses that resource management pattern.
Project: http://git-wip-us.apache.org/repos/asf/tinkerpop/repo
Commit: http://git-wip-us.apache.org/repos/asf/tinkerpop/commit/36e1159a
Tree: http://git-wip-us.apache.org/repos/asf/tinkerpop/tree/36e1159a
Diff: http://git-wip-us.apache.org/repos/asf/tinkerpop/diff/36e1159a
Branch: refs/heads/TINKERPOP-1534
Commit: 36e1159a80f539b8bd4a884e5c1cf304ec52c4f9
Parents: b262c7e
Author: Dan LaRocque <da...@hopcount.org>
Authored: Tue Oct 25 19:37:17 2016 -0500
Committer: Dan LaRocque <da...@hopcount.org>
Committed: Tue Oct 25 20:37:17 2016 -0400
----------------------------------------------------------------------
.../gremlin/spark/process/computer/SparkExecutor.java | 6 ++++++
1 file changed, 6 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/tinkerpop/blob/36e1159a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
----------------------------------------------------------------------
diff --git a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
index 8dd2381..6e65e26 100644
--- a/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
+++ b/spark-gremlin/src/main/java/org/apache/tinkerpop/gremlin/spark/process/computer/SparkExecutor.java
@@ -91,9 +91,15 @@ public final class SparkExecutor {
// for each partition of vertices emit a view and their outgoing messages
.mapPartitionsToPair(partitionIterator -> {
KryoShimServiceLoader.applyConfiguration(graphComputerConfiguration);
+
+ // if the partition is empty, return without starting a new VP iteration
+ if (!partitionIterator.hasNext())
+ return Collections.emptyList();
+
final VertexProgram<M> workerVertexProgram = VertexProgram.createVertexProgram(HadoopGraph.open(graphComputerConfiguration), vertexProgramConfiguration); // each partition(Spark)/worker(TP3) has a local copy of the vertex program (a worker's task)
final String[] vertexComputeKeysArray = VertexProgramHelper.vertexComputeKeysAsArray(workerVertexProgram.getVertexComputeKeys()); // the compute keys as an array
final SparkMessenger<M> messenger = new SparkMessenger<>();
+
workerVertexProgram.workerIterationStart(memory.asImmutable()); // start the worker
return () -> IteratorUtils.map(partitionIterator, vertexViewIncoming -> {
final StarGraph.StarVertex vertex = vertexViewIncoming._2()._1().get(); // get the vertex from the vertex writable