You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by zhang juntao <ju...@gmail.com> on 2016/04/10 18:08:59 UTC
spark graphx storage RDD memory leak
hi experts,
I’m reporting a problem about spark graphx, I use zeppelin submit spark jobs,
note that scala environment shares the same SparkContext, SQLContext instance,
and I call Connected components algorithm to do some Business,
found that every time when the job finished, some graph storage RDDs weren’t bean released,
after several times there would be a lot of storage RDDs existing even through all the jobs have finished .
So I check the code of connectedComponents and find that may be a problem in Pregel.scala .
when param graph has been cached, there isn’t any way to unpersist,
so I add red font code to solve the problem
def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
(graph: Graph[VD, ED],
initialMsg: A,
maxIterations: Int = Int.MaxValue,
activeDirection: EdgeDirection = EdgeDirection.Either)
(vprog: (VertexId, VD, A) => VD,
sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
mergeMsg: (A, A) => A)
: Graph[VD, ED] =
{
......
var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
graph.unpersistVertices(blocking = false)
graph.edges.unpersist(blocking = false)
......
} // end of apply
I'm not sure if this is a bug,
and thank you for your time,
juntao
Fwd: spark graphx storage RDD memory leak
Posted by zhang juntao <ju...@gmail.com>.
yes I use version 1.6 , and thanks Ted
> Begin forwarded message:
>
> From: Robin East <ro...@xense.co.uk>
> Subject: Re: spark graphx storage RDD memory leak
> Date: April 12, 2016 at 2:13:10 AM GMT+8
> To: zhang juntao <ju...@gmail.com>
> Cc: Ted Yu <yu...@gmail.com>, dev@spark.apache.org
>
> this looks like https://issues.apache.org/jira/browse/SPARK-12655 <https://issues.apache.org/jira/browse/SPARK-12655> fixed in 2.0
> -------------------------------------------------------------------------------
> Robin East
> Spark GraphX in Action Michael Malak and Robin East
> Manning Publications Co.
> http://www.manning.com/books/spark-graphx-in-action <http://www.manning.com/books/spark-graphx-in-action>
>
>
>
>
>
>> On 11 Apr 2016, at 07:23, zhang juntao <juntao.zhang.cn@gmail.com <ma...@gmail.com>> wrote:
>>
>> thanks ted for replying ,
>> these three lines can’t release param graph cache, it only release g ( graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() )
>> ConnectedComponents.scala param graph will cache in ccGraph and won’t be release in Pregel
>> def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
>> val ccGraph = graph.mapVertices { case (vid, _) => vid }
>> def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = {
>> if (edge.srcAttr < edge.dstAttr) {
>> Iterator((edge.dstId, edge.srcAttr))
>> } else if (edge.srcAttr > edge.dstAttr) {
>> Iterator((edge.srcId, edge.dstAttr))
>> } else {
>> Iterator.empty
>> }
>> }
>> val initialMessage = Long.MaxValue
>> Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
>> vprog = (id, attr, msg) => math.min(attr, msg),
>> sendMsg = sendMessage,
>> mergeMsg = (a, b) => math.min(a, b))
>> } // end of connectedComponents
>> }
>> thanks
>> juntao
>>
>>
>>> Begin forwarded message:
>>>
>>> From: Ted Yu <yuzhihong@gmail.com <ma...@gmail.com>>
>>> Subject: Re: spark graphx storage RDD memory leak
>>> Date: April 11, 2016 at 1:15:23 AM GMT+8
>>> To: zhang juntao <juntao.zhang.cn@gmail.com <ma...@gmail.com>>
>>> Cc: "dev@spark.apache.org <ma...@spark.apache.org>" <dev@spark.apache.org <ma...@spark.apache.org>>
>>>
>>> I see the following code toward the end of the method:
>>>
>>> // Unpersist the RDDs hidden by newly-materialized RDDs
>>> oldMessages.unpersist(blocking = false)
>>> prevG.unpersistVertices(blocking = false)
>>> prevG.edges.unpersist(blocking = false)
>>>
>>> Wouldn't the above achieve same effect ?
>>>
>>> On Sun, Apr 10, 2016 at 9:08 AM, zhang juntao <juntao.zhang.cn@gmail.com <ma...@gmail.com>> wrote:
>>> hi experts,
>>>
>>> I’m reporting a problem about spark graphx, I use zeppelin submit spark jobs,
>>> note that scala environment shares the same SparkContext, SQLContext instance,
>>> and I call Connected components algorithm to do some Business,
>>> found that every time when the job finished, some graph storage RDDs weren’t bean released,
>>> after several times there would be a lot of storage RDDs existing even through all the jobs have finished .
>>>
>>> <PastedGraphic-1.png>
>>>
>>> So I check the code of connectedComponents and find that may be a problem in Pregel.scala .
>>> when param graph has been cached, there isn’t any way to unpersist,
>>> so I add red font code to solve the problem
>>> def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
>>> (graph: Graph[VD, ED],
>>> initialMsg: A,
>>> maxIterations: Int = Int.MaxValue,
>>> activeDirection: EdgeDirection = EdgeDirection.Either)
>>> (vprog: (VertexId, VD, A) => VD,
>>> sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
>>> mergeMsg: (A, A) => A)
>>> : Graph[VD, ED] =
>>> {
>>> ......
>>> var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
>>> graph.unpersistVertices(blocking = false)
>>> graph.edges.unpersist(blocking = false)
>>> ......
>>>
>>> } // end of apply
>>>
>>> I'm not sure if this is a bug,
>>> and thank you for your time,
>>> juntao
>>>
>>>
>>>
>>
>
Re: spark graphx storage RDD memory leak
Posted by Robin East <ro...@xense.co.uk>.
this looks like https://issues.apache.org/jira/browse/SPARK-12655 <https://issues.apache.org/jira/browse/SPARK-12655> fixed in 2.0
-------------------------------------------------------------------------------
Robin East
Spark GraphX in Action Michael Malak and Robin East
Manning Publications Co.
http://www.manning.com/books/spark-graphx-in-action <http://www.manning.com/books/spark-graphx-in-action>
> On 11 Apr 2016, at 07:23, zhang juntao <ju...@gmail.com> wrote:
>
> thanks ted for replying ,
> these three lines can’t release param graph cache, it only release g ( graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() )
> ConnectedComponents.scala param graph will cache in ccGraph and won’t be release in Pregel
> def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
> val ccGraph = graph.mapVertices { case (vid, _) => vid }
> def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = {
> if (edge.srcAttr < edge.dstAttr) {
> Iterator((edge.dstId, edge.srcAttr))
> } else if (edge.srcAttr > edge.dstAttr) {
> Iterator((edge.srcId, edge.dstAttr))
> } else {
> Iterator.empty
> }
> }
> val initialMessage = Long.MaxValue
> Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
> vprog = (id, attr, msg) => math.min(attr, msg),
> sendMsg = sendMessage,
> mergeMsg = (a, b) => math.min(a, b))
> } // end of connectedComponents
> }
> thanks
> juntao
>
>
>> Begin forwarded message:
>>
>> From: Ted Yu <yuzhihong@gmail.com <ma...@gmail.com>>
>> Subject: Re: spark graphx storage RDD memory leak
>> Date: April 11, 2016 at 1:15:23 AM GMT+8
>> To: zhang juntao <juntao.zhang.cn@gmail.com <ma...@gmail.com>>
>> Cc: "dev@spark.apache.org <ma...@spark.apache.org>" <dev@spark.apache.org <ma...@spark.apache.org>>
>>
>> I see the following code toward the end of the method:
>>
>> // Unpersist the RDDs hidden by newly-materialized RDDs
>> oldMessages.unpersist(blocking = false)
>> prevG.unpersistVertices(blocking = false)
>> prevG.edges.unpersist(blocking = false)
>>
>> Wouldn't the above achieve same effect ?
>>
>> On Sun, Apr 10, 2016 at 9:08 AM, zhang juntao <juntao.zhang.cn@gmail.com <ma...@gmail.com>> wrote:
>> hi experts,
>>
>> I’m reporting a problem about spark graphx, I use zeppelin submit spark jobs,
>> note that scala environment shares the same SparkContext, SQLContext instance,
>> and I call Connected components algorithm to do some Business,
>> found that every time when the job finished, some graph storage RDDs weren’t bean released,
>> after several times there would be a lot of storage RDDs existing even through all the jobs have finished .
>>
>> <PastedGraphic-1.png>
>>
>> So I check the code of connectedComponents and find that may be a problem in Pregel.scala .
>> when param graph has been cached, there isn’t any way to unpersist,
>> so I add red font code to solve the problem
>> def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
>> (graph: Graph[VD, ED],
>> initialMsg: A,
>> maxIterations: Int = Int.MaxValue,
>> activeDirection: EdgeDirection = EdgeDirection.Either)
>> (vprog: (VertexId, VD, A) => VD,
>> sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
>> mergeMsg: (A, A) => A)
>> : Graph[VD, ED] =
>> {
>> ......
>> var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
>> graph.unpersistVertices(blocking = false)
>> graph.edges.unpersist(blocking = false)
>> ......
>>
>> } // end of apply
>>
>> I'm not sure if this is a bug,
>> and thank you for your time,
>> juntao
>>
>>
>>
>
Fwd: spark graphx storage RDD memory leak
Posted by zhang juntao <ju...@gmail.com>.
thanks ted for replying ,
these three lines can’t release param graph cache, it only release g ( graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() )
ConnectedComponents.scala param graph will cache in ccGraph and won’t be release in Pregel
def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED]): Graph[VertexId, ED] = {
val ccGraph = graph.mapVertices { case (vid, _) => vid }
def sendMessage(edge: EdgeTriplet[VertexId, ED]): Iterator[(VertexId, VertexId)] = {
if (edge.srcAttr < edge.dstAttr) {
Iterator((edge.dstId, edge.srcAttr))
} else if (edge.srcAttr > edge.dstAttr) {
Iterator((edge.srcId, edge.dstAttr))
} else {
Iterator.empty
}
}
val initialMessage = Long.MaxValue
Pregel(ccGraph, initialMessage, activeDirection = EdgeDirection.Either)(
vprog = (id, attr, msg) => math.min(attr, msg),
sendMsg = sendMessage,
mergeMsg = (a, b) => math.min(a, b))
} // end of connectedComponents
}
thanks
juntao
> Begin forwarded message:
>
> From: Ted Yu <yu...@gmail.com>
> Subject: Re: spark graphx storage RDD memory leak
> Date: April 11, 2016 at 1:15:23 AM GMT+8
> To: zhang juntao <ju...@gmail.com>
> Cc: "dev@spark.apache.org" <de...@spark.apache.org>
>
> I see the following code toward the end of the method:
>
> // Unpersist the RDDs hidden by newly-materialized RDDs
> oldMessages.unpersist(blocking = false)
> prevG.unpersistVertices(blocking = false)
> prevG.edges.unpersist(blocking = false)
>
> Wouldn't the above achieve same effect ?
>
> On Sun, Apr 10, 2016 at 9:08 AM, zhang juntao <juntao.zhang.cn@gmail.com <ma...@gmail.com>> wrote:
> hi experts,
>
> I’m reporting a problem about spark graphx, I use zeppelin submit spark jobs,
> note that scala environment shares the same SparkContext, SQLContext instance,
> and I call Connected components algorithm to do some Business,
> found that every time when the job finished, some graph storage RDDs weren’t bean released,
> after several times there would be a lot of storage RDDs existing even through all the jobs have finished .
>
>
>
> So I check the code of connectedComponents and find that may be a problem in Pregel.scala .
> when param graph has been cached, there isn’t any way to unpersist,
> so I add red font code to solve the problem
> def apply[VD: ClassTag, ED: ClassTag, A: ClassTag]
> (graph: Graph[VD, ED],
> initialMsg: A,
> maxIterations: Int = Int.MaxValue,
> activeDirection: EdgeDirection = EdgeDirection.Either)
> (vprog: (VertexId, VD, A) => VD,
> sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
> mergeMsg: (A, A) => A)
> : Graph[VD, ED] =
> {
> ......
> var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache()
> graph.unpersistVertices(blocking = false)
> graph.edges.unpersist(blocking = false)
> ......
>
> } // end of apply
>
> I'm not sure if this is a bug,
> and thank you for your time,
> juntao
>
>
>
Re: spark graphx storage RDD memory leak
Posted by Ted Yu <yu...@gmail.com>.
I see the following code toward the end of the method:
// Unpersist the RDDs hidden by newly-materialized RDDs
oldMessages.unpersist(blocking = false)
prevG.unpersistVertices(blocking = false)
prevG.edges.unpersist(blocking = false)
Wouldn't the above achieve same effect ?
On Sun, Apr 10, 2016 at 9:08 AM, zhang juntao <ju...@gmail.com>
wrote:
> hi experts,
>
> I’m reporting a problem about spark graphx, I use zeppelin submit spark
> jobs,
> note that scala environment shares the same SparkContext, SQLContext
> instance,
> and I call Connected components algorithm to do some Business,
> found that every time when the job finished, some graph storage RDDs
> weren’t bean released,
> after several times there would be a lot of storage RDDs existing even
> through all the jobs have finished .
>
>
> So I check the code of connectedComponents and find that may be a problem
> in *Pregel.scala* .
> when param graph has been cached, there isn’t any way to unpersist,
> so I add red font code to solve the problem
>
>
>
>
>
>
>
>
>
>
> *def apply[VD: ClassTag, ED: ClassTag, A: ClassTag] (graph: Graph[VD, ED], initialMsg: A, maxIterations: Int = Int.MaxValue, activeDirection: EdgeDirection = EdgeDirection.Either) (vprog: (VertexId, VD, A) => VD, sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)], mergeMsg: (A, A) => A) : Graph[VD, ED] ={*
>
>
> * ......*
>
>
>
> * var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata, initialMsg)).cache() graph.unpersistVertices(blocking = false) graph.edges.unpersist(blocking = false)*
>
> * ......*
>
>
> *} // end of apply*
>
>
> I'm not sure if this is a bug, and thank you for your time, juntao
>
>
>