You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Hyukjin Kwon (JIRA)" <ji...@apache.org> on 2019/05/21 04:04:39 UTC

[jira] [Updated] (SPARK-19023) Memory leak on GraphX with an iterative algorithm and checkpoint on the graph

     [ https://issues.apache.org/jira/browse/SPARK-19023?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Hyukjin Kwon updated SPARK-19023:
---------------------------------
    Labels: bulk-closed memory-leak  (was: memory-leak)

> Memory leak on GraphX with an iterative algorithm and checkpoint on the graph
> -----------------------------------------------------------------------------
>
>                 Key: SPARK-19023
>                 URL: https://issues.apache.org/jira/browse/SPARK-19023
>             Project: Spark
>          Issue Type: Bug
>          Components: GraphX
>    Affects Versions: 2.0.2
>            Reporter: Julien MASSIOT
>            Priority: Major
>              Labels: bulk-closed, memory-leak
>
> I am facing OOM whithin a spark streaming application with GraphX.  
> While trying to reproduce the issue on a simple application, I was able to identify what appears to be 2 kind of memory leaks.  
>   
> *Leak 1*
> It can be reproduced with this simple scala application (that simulates more or less what I'm doing in my spark streaming application, each iteration within the loop simulating one micro-batch).
> {code:title=TestGraph.scala|borderStyle=solid}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.graphx.Graph
> import org.apache.spark.rdd.RDD
> import org.apache.spark.graphx._
> object TestGraph {
>     case class ImpactingEvent(entityInstance: String)
>     case class ImpactedNode(entityIsntance:String)
>     case class RelationInstance(relationType : String)
>     var impactingGraph : Graph[ImpactedNode, RelationInstance] = null;
>     
>     def main(args: Array[String]) {
>       val conf = new SparkConf().setAppName("TestImpactingPropagation").setMaster("local")
>       conf.set("spark.checkpoint.checkpointAllMarkedAncestors", "True")
>       val sc = new SparkContext(conf)
>       sc.setLogLevel("ERROR")
>      
>       val vertices: RDD[(VertexId, ImpactedNode)] = sc.parallelize(Array( (1L, ImpactedNode("Node1")), (2L, ImpactedNode("Node2")), (3L, ImpactedNode("Node3"))))
>       
>       val edges: RDD[Edge[RelationInstance]] =  sc.parallelize(Array( Edge(1L, 2L, RelationInstance("Required")), Edge(1L, 2L, RelationInstance("Failover"))))
>         
>       impactingGraph = Graph(vertices, edges, null)
>       
>       var x =0;
>       for(x <- 1 to 10){
>         impactingGraph = propagateEvent(impactingGraph, ImpactingEvent("node1"), sc)
>         
>         impactingGraph.checkpoint()
>         impactingGraph.edges.count()
>         impactingGraph.vertices.count()
>       }
>       println("Hello")
>       Thread.sleep(10000000)
>     }
>     
>     private def propagateEvent(impactingGraph: Graph[ImpactedNode, RelationInstance], event: ImpactingEvent, sc:SparkContext): Graph[ImpactedNode, RelationInstance] = {
>       var graph = impactingGraph.mapVertices((id, node) => node ).cache
>       impactingGraph.unpersist(true)
>       graph.cache();
>     }
> }
> {code}
>   
> In this simple application, I am just applying a mapVertices transformation on the graph and then I am doing a checkpoint on the graph. I am doing this operation 10 times.   
> After this application finished the loop, I am taking an heapdump.  
>   
> In this heapdump, I am able to see 11 "live" GraphImpl instances in memory.  
> My expectation is to have only 1 (the one referenced in the global variable impactingGraph).  
>   
> The "leak" is coming from the f function in a MapPartitionsRDD (which is referenced by the partitionsRDD variable of my VertexRDD).
> This f function contains an outer reference to the graph created in the previous iteration.
> I can see that in the clearDependencies function of MapPartitionsRDD, the f function is not reset to null.
>   
> When looking to similar issues, I found this pull request:  
> [https://github.com/apache/spark/pull/3545]
> In this pull request, the f variable is reset to null in the clearDependencies method of the ZippedPartitionsRDD.
> I do not understand why the same is not done within the MapPartitionsRDD.  
> I made a try by patching spark-core and by setting f to null in clearDependencies of MapPartitionsRDD and it solved my leak on this simple use case.
> Don't you think the f variable has to be reset to null also in MapPartitionsRDD ?
> *Leak 2*
> Now, I'll do the same but in the propageEvent method in addition to the mapVertices I am doing a joinVertices on the graph.
> It can be found in the following application:
> {code:title=TestGraph.scala|borderStyle=solid}
> import org.apache.spark.SparkConf
> import org.apache.spark.SparkContext
> import org.apache.spark.graphx.Graph
> import org.apache.spark.rdd.RDD
> import org.apache.spark.graphx._
> object TestGraph {
>     case class ImpactingEvent(entityInstance: String)
>     case class ImpactedNode(entityIsntance:String)
>     case class RelationInstance(relationType : String)
>     var impactingGraph : Graph[ImpactedNode, RelationInstance] = null;
>     
>     def main(args: Array[String]) {
>       val conf = new SparkConf().setAppName("TestImpactingPropagation").setMaster("local")
>       conf.set("spark.checkpoint.checkpointAllMarkedAncestors", "True")
>       val sc = new SparkContext(conf)
>       sc.setLogLevel("ERROR")
>      
>       val vertices: RDD[(VertexId, ImpactedNode)] = sc.parallelize(Array( (1L, ImpactedNode("Node1")), (2L, ImpactedNode("Node2")), (3L, ImpactedNode("Node3"))))
>       
>       val edges: RDD[Edge[RelationInstance]] =  sc.parallelize(Array( Edge(1L, 2L, RelationInstance("Required")), Edge(1L, 2L, RelationInstance("Failover"))))
>         
>       impactingGraph = Graph(vertices, edges, null)
>       
>       var x =0;
>       for(x <- 1 to 10){
>         impactingGraph = propagateEvent(impactingGraph, ImpactingEvent("node1"), sc)
>         
>         impactingGraph.checkpoint()
>         impactingGraph.edges.count()
>         impactingGraph.vertices.count()
>       }
>       println("Hello")
>       Thread.sleep(10000000)
>     }
>     
>     private def propagateEvent(impactingGraph: Graph[ImpactedNode, RelationInstance], event: ImpactingEvent, sc:SparkContext): Graph[ImpactedNode, RelationInstance] = {
>       var graph = impactingGraph.mapVertices((id, node) => node ).cache
>       val verticesToJoin: RDD[(VertexId, String)] = sc.parallelize(Array( (1L, "Node1"), (2L, "Node2")) )
>       graph = graph.joinVertices(verticesToJoin)({(id,src,toJoin)=>src})
>       impactingGraph.unpersist(true)
>       graph.cache();
>     }
> }
> {code}
> When running this application and taking a memory dump, I can still see 11 "live" GraphImpl in memory (where I am expecting only 1) (even with the patch described in the previous section).
> When analyzing this dump, I can see that the "leak" is coming from a reference to an array of partitions hold by the "partitions_" variable within the EdgeRDD (this array of partitions contains a reference to the MapPartitionsRDD that contains a reference to the graph created by the previous iteration similarly to what is described in the *Leak 1* section)
> This array of partitions is referenced 2 times:
> * once in the "partitions_" variable of the partitionsRDD emebedded within the EdgeRDD
> * once in the "partitions_" variable of the EdgeRDD itself
> This is coming from the getPartition method within the EdgeRDD
> {code:title=EdgeRDD.scala|borderStyle=solid}
>   override protected def getPartitions: Array[Partition] = partitionsRDD.partitions
> {code}
> After the checkpoint and count is called on graph edges, the reference to this array is cleaned within the partitionsRDD of the EdgeRDD.
> It is done through this call:
> {code:title=RDD.scala|borderStyle=solid}
>   /**
>    * Changes the dependencies of this RDD from its original parents to a new RDD (`newRDD`)
>    * created from the checkpoint file, and forget its old dependencies and partitions.
>    */
>   private[spark] def markCheckpointed(): Unit = {
>     clearDependencies()
>     partitions_ = null
>     deps = null    // Forget the constructor argument for dependencies too
>   }
> {code}
> But this is not done for the "partitions_" variable of the EdgeRDD itself.
> Indeed, the markCheckpointed() method is not called on the EdgeRDD itself but only on the partitionsRDD embedded within the EdgeRDD.
> Due to that, we still have a reference to this array of partitions (that references a MapPartitionsRDD that references the graph of the previous iteration).
> I am able to solve this leak if I am calling the checkpoint and count on the edges just after the mapVertices (and before the joinVertices) (and if the patch described in the previous section is applied on MapPartitionsRDD).
> But it doesn't seem clean to me.
> In my mind:
> * either the "partitions_" variable of the EdgeRDD should be reset to null after a checkpoint is called on the Graph
> * either the "partitions_" variable of the EdgeRDD should not reference the same array of partitions as the one referenced by the "partitions_" variable of the partitionsRDD. (don't know if this "partitions_" is really usefull on the EdgeRDD)
> What do you think?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org