You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:04:39 UTC
[jira] [Updated] (SPARK-19023) Memory leak on GraphX with an
iterative algorithm and checkpoint on the graph
[ https://issues.apache.org/jira/browse/SPARK-19023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Hyukjin Kwon updated SPARK-19023:
---------------------------------
Labels: bulk-closed memory-leak (was: memory-leak)
> Memory leak on GraphX with an iterative algorithm and checkpoint on the graph
> -----------------------------------------------------------------------------
>
> Key: SPARK-19023
> URL: https://issues.apache.org/jira/browse/SPARK-19023
> Project: Spark
> Issue Type: Bug
> Components: GraphX
> Affects Versions: 2.0.2
> Reporter: Julien MASSIOT
> Priority: Major
> Labels: bulk-closed, memory-leak
>
> I am facing OOM whithin a spark streaming application with GraphX.
> While trying to reproduce the issue on a simple application, I was able to identify what appears to be 2 kind of memory leaks.
>
> *Leak 1*
> It can be reproduced with this simple scala application (that simulates more or less what I'm doing in my spark streaming application, each iteration within the loop simulating one micro-batch).
> {code:title=TestGraph.scala|borderStyle=solid}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.graphx.Graph
> import org.apache.spark.rdd.RDD
> import org.apache.spark.graphx._
> object TestGraph {
> case class ImpactingEvent(entityInstance: String)
> case class ImpactedNode(entityIsntance:String)
> case class RelationInstance(relationType : String)
> var impactingGraph : Graph[ImpactedNode, RelationInstance] = null;
>
> def main(args: Array[String]) {
> val conf = new SparkConf().setAppName("TestImpactingPropagation").setMaster("local")
> conf.set("spark.checkpoint.checkpointAllMarkedAncestors", "True")
> val sc = new SparkContext(conf)
> sc.setLogLevel("ERROR")
>
> val vertices: RDD[(VertexId, ImpactedNode)] = sc.parallelize(Array( (1L, ImpactedNode("Node1")), (2L, ImpactedNode("Node2")), (3L, ImpactedNode("Node3"))))
>
> val edges: RDD[Edge[RelationInstance]] = sc.parallelize(Array( Edge(1L, 2L, RelationInstance("Required")), Edge(1L, 2L, RelationInstance("Failover"))))
>
> impactingGraph = Graph(vertices, edges, null)
>
> var x =0;
> for(x <- 1 to 10){
> impactingGraph = propagateEvent(impactingGraph, ImpactingEvent("node1"), sc)
>
> impactingGraph.checkpoint()
> impactingGraph.edges.count()
> impactingGraph.vertices.count()
> }
> println("Hello")
> Thread.sleep(10000000)
> }
>
> private def propagateEvent(impactingGraph: Graph[ImpactedNode, RelationInstance], event: ImpactingEvent, sc:SparkContext): Graph[ImpactedNode, RelationInstance] = {
> var graph = impactingGraph.mapVertices((id, node) => node ).cache
> impactingGraph.unpersist(true)
> graph.cache();
> }
> }
> {code}
>
> In this simple application, I am just applying a mapVertices transformation on the graph and then I am doing a checkpoint on the graph. I am doing this operation 10 times.
> After this application finished the loop, I am taking an heapdump.
>
> In this heapdump, I am able to see 11 "live" GraphImpl instances in memory.
> My expectation is to have only 1 (the one referenced in the global variable impactingGraph).
>
> The "leak" is coming from the f function in a MapPartitionsRDD (which is referenced by the partitionsRDD variable of my VertexRDD).
> This f function contains an outer reference to the graph created in the previous iteration.
> I can see that in the clearDependencies function of MapPartitionsRDD, the f function is not reset to null.
>
> When looking to similar issues, I found this pull request:
> [https://github.com/apache/spark/pull/3545]
> In this pull request, the f variable is reset to null in the clearDependencies method of the ZippedPartitionsRDD.
> I do not understand why the same is not done within the MapPartitionsRDD.
> I made a try by patching spark-core and by setting f to null in clearDependencies of MapPartitionsRDD and it solved my leak on this simple use case.
> Don't you think the f variable has to be reset to null also in MapPartitionsRDD ?
> *Leak 2*
> Now, I'll do the same but in the propageEvent method in addition to the mapVertices I am doing a joinVertices on the graph.
> It can be found in the following application:
> {code:title=TestGraph.scala|borderStyle=solid}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.graphx.Graph
> import org.apache.spark.rdd.RDD
> import org.apache.spark.graphx._
> object TestGraph {
> case class ImpactingEvent(entityInstance: String)
> case class ImpactedNode(entityIsntance:String)
> case class RelationInstance(relationType : String)
> var impactingGraph : Graph[ImpactedNode, RelationInstance] = null;
>
> def main(args: Array[String]) {
> val conf = new SparkConf().setAppName("TestImpactingPropagation").setMaster("local")
> conf.set("spark.checkpoint.checkpointAllMarkedAncestors", "True")
> val sc = new SparkContext(conf)
> sc.setLogLevel("ERROR")
>
> val vertices: RDD[(VertexId, ImpactedNode)] = sc.parallelize(Array( (1L, ImpactedNode("Node1")), (2L, ImpactedNode("Node2")), (3L, ImpactedNode("Node3"))))
>
> val edges: RDD[Edge[RelationInstance]] = sc.parallelize(Array( Edge(1L, 2L, RelationInstance("Required")), Edge(1L, 2L, RelationInstance("Failover"))))
>
> impactingGraph = Graph(vertices, edges, null)
>
> var x =0;
> for(x <- 1 to 10){
> impactingGraph = propagateEvent(impactingGraph, ImpactingEvent("node1"), sc)
>
> impactingGraph.checkpoint()
> impactingGraph.edges.count()
> impactingGraph.vertices.count()
> }
> println("Hello")
> Thread.sleep(10000000)
> }
>
> private def propagateEvent(impactingGraph: Graph[ImpactedNode, RelationInstance], event: ImpactingEvent, sc:SparkContext): Graph[ImpactedNode, RelationInstance] = {
> var graph = impactingGraph.mapVertices((id, node) => node ).cache
> val verticesToJoin: RDD[(VertexId, String)] = sc.parallelize(Array( (1L, "Node1"), (2L, "Node2")) )
> graph = graph.joinVertices(verticesToJoin)({(id,src,toJoin)=>src})
> impactingGraph.unpersist(true)
> graph.cache();
> }
> }
> {code}
> When running this application and taking a memory dump, I can still see 11 "live" GraphImpl in memory (where I am expecting only 1) (even with the patch described in the previous section).
> When analyzing this dump, I can see that the "leak" is coming from a reference to an array of partitions hold by the "partitions_" variable within the EdgeRDD (this array of partitions contains a reference to the MapPartitionsRDD that contains a reference to the graph created by the previous iteration similarly to what is described in the *Leak 1* section)
> This array of partitions is referenced 2 times:
> * once in the "partitions_" variable of the partitionsRDD emebedded within the EdgeRDD
> * once in the "partitions_" variable of the EdgeRDD itself
> This is coming from the getPartition method within the EdgeRDD
> {code:title=EdgeRDD.scala|borderStyle=solid}
> override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
> {code}
> After the checkpoint and count is called on graph edges, the reference to this array is cleaned within the partitionsRDD of the EdgeRDD.
> It is done through this call:
> {code:title=RDD.scala|borderStyle=solid}
> /**
> * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`)
> * created from the checkpoint file, and forget its old dependencies and partitions.
> */
> private[spark] def markCheckpointed(): Unit = {
> clearDependencies()
> partitions_ = null
> deps = null // Forget the constructor argument for dependencies too
> }
> {code}
> But this is not done for the "partitions_" variable of the EdgeRDD itself.
> Indeed, the markCheckpointed() method is not called on the EdgeRDD itself but only on the partitionsRDD embedded within the EdgeRDD.
> Due to that, we still have a reference to this array of partitions (that references a MapPartitionsRDD that references the graph of the previous iteration).
> I am able to solve this leak if I am calling the checkpoint and count on the edges just after the mapVertices (and before the joinVertices) (and if the patch described in the previous section is applied on MapPartitionsRDD).
> But it doesn't seem clean to me.
> In my mind:
> * either the "partitions_" variable of the EdgeRDD should be reset to null after a checkpoint is called on the Graph
> * either the "partitions_" variable of the EdgeRDD should not reference the same array of partitions as the one referenced by the "partitions_" variable of the partitionsRDD. (don't know if this "partitions_" is really usefull on the EdgeRDD)
> What do you think?
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org