You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "CacheCheck (Jira)" <ji...@apache.org> on 2020/03/22 14:42:00 UTC

[jira] [Commented] (SPARK-29878) Improper cache strategies in GraphX

    [ https://issues.apache.org/jira/browse/SPARK-29878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17064297#comment-17064297 ] 

CacheCheck commented on SPARK-29878:
------------------------------------

And I also find that newEdges.cache() in GraphImpl.partitionBy(), vertices.cache() in GraphImpl.subgraph(), vertices.cache() in GraphImpl.aggregateMessagesWithActiveSet() have the same conditions. They are unnecessary caches when I run TriangleCountingExample in example.graphx package.

> Improper cache strategies in GraphX
> -----------------------------------
>
>                 Key: SPARK-29878
>                 URL: https://issues.apache.org/jira/browse/SPARK-29878
>             Project: Spark
>          Issue Type: Improvement
>          Components: GraphX
>    Affects Versions: 3.0.0
>            Reporter: CacheCheck
>            Priority: Major
>
> I have run examples.graphx.SSPExample and looked through the RDD dependency graphs as well as persist operations. There are some improper cache strategies in GraphX. The same situations also exist when I run ConnectedComponentsExample.
> 1.  vertices.cache() and newEdges.cache() are unnecessary
> In SSPExample, a graph is initialized by GraphImpl.mapVertices(). In this method, a GraphImpl object is created using GraphImpl.apply(vertices, edges), and RDD vertices/newEdges are cached in apply(). But these two RDDs are not directly used anymore (their children RDDs has been cached) in SSPExample, so the persists can be unnecessary here. 
> However, the other examples may need these two persists, so I think they cannot be simply removed. It might be hard to fix.
> {code:scala}
>   def apply[VD: ClassTag, ED: ClassTag](
>       vertices: VertexRDD[VD],
>       edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
>     vertices.cache() // It is unnecessary for SSPExample and ConnectedComponentsExample
>     // Convert the vertex partitions in edges to the correct type
>     val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
>       .mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
>       .cache() // It is unnecessary for SSPExample and ConnectedComponentsExample
>     GraphImpl.fromExistingRDDs(vertices, newEdges)
>   }
> {code}
> 2. Missing persist on newEdges
> SSSPExample will invoke pregel to do execution. Pregel will ultilize ReplicatedVertexView.upgrade(). I find that RDD newEdges will be directly use by multiple actions in Pregel. So newEdges should be persisted.
> Same as the above issue, this issue is also found in ConnectedComponentsExample. It is also hard to fix, because the persist added may be unnecessary for other examples.
> {code:scala}
> // Pregel.scala
>     // compute the messages
>     var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg) // newEdges is created here
>     val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)](
>       checkpointInterval, graph.vertices.sparkContext)
>     messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
>     var activeMessages = messages.count() // The first time use newEdges
>     ...
>     while (activeMessages > 0 && i < maxIterations) {
>       // Receive the messages and update the vertices.
>       prevG = g
>       g = g.joinVertices(messages)(vprog) // Generate g will depends on newEdges
>       ...
>       activeMessages = messages.count() // The second action to use newEdges. newEdges should be unpersisted after this instruction.
> {code}
> {code:scala}
> // ReplicatedVertexView.scala
>   def upgrade(vertices: VertexRDD[VD], includeSrc: Boolean, includeDst: Boolean): Unit = {
>       ...
>        val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
>         (ePartIter, shippedVertsIter) => ePartIter.map {
>           case (pid, edgePartition) =>
>             (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
>         }
>       })
>       edges = newEdges // newEdges should be persisted
>       hasSrcId = includeSrc
>       hasDstId = includeDst
>     }
>   }
> {code}
> As I don't have much knowledge about Graphx, so I don't know how to fix these issues well.
> This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org