You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "CacheCheck (Jira)" <ji...@apache.org> on 2020/03/22 14:42:00 UTC
[jira] [Commented] (SPARK-29878) Improper cache strategies in
GraphX
[ https://issues.apache.org/jira/browse/SPARK-29878?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17064297#comment-17064297 ]
CacheCheck commented on SPARK-29878:
------------------------------------
And I also find that newEdges.cache() in GraphImpl.partitionBy(), vertices.cache() in GraphImpl.subgraph(), vertices.cache() in GraphImpl.aggregateMessagesWithActiveSet() have the same conditions. They are unnecessary caches when I run TriangleCountingExample in example.graphx package.
> Improper cache strategies in GraphX
> -----------------------------------
>
> Key: SPARK-29878
> URL: https://issues.apache.org/jira/browse/SPARK-29878
> Project: Spark
> Issue Type: Improvement
> Components: GraphX
> Affects Versions: 3.0.0
> Reporter: CacheCheck
> Priority: Major
>
> I have run examples.graphx.SSPExample and looked through the RDD dependency graphs as well as persist operations. There are some improper cache strategies in GraphX. The same situations also exist when I run ConnectedComponentsExample.
> 1. vertices.cache() and newEdges.cache() are unnecessary
> In SSPExample, a graph is initialized by GraphImpl.mapVertices(). In this method, a GraphImpl object is created using GraphImpl.apply(vertices, edges), and RDD vertices/newEdges are cached in apply(). But these two RDDs are not directly used anymore (their children RDDs has been cached) in SSPExample, so the persists can be unnecessary here.
> However, the other examples may need these two persists, so I think they cannot be simply removed. It might be hard to fix.
> {code:scala}
> def apply[VD: ClassTag, ED: ClassTag](
> vertices: VertexRDD[VD],
> edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
> vertices.cache() // It is unnecessary for SSPExample and ConnectedComponentsExample
> // Convert the vertex partitions in edges to the correct type
> val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
> .mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
> .cache() // It is unnecessary for SSPExample and ConnectedComponentsExample
> GraphImpl.fromExistingRDDs(vertices, newEdges)
> }
> {code}
> 2. Missing persist on newEdges
> SSSPExample will invoke pregel to do execution. Pregel will ultilize ReplicatedVertexView.upgrade(). I find that RDD newEdges will be directly use by multiple actions in Pregel. So newEdges should be persisted.
> Same as the above issue, this issue is also found in ConnectedComponentsExample. It is also hard to fix, because the persist added may be unnecessary for other examples.
> {code:scala}
> // Pregel.scala
> // compute the messages
> var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg) // newEdges is created here
> val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)](
> checkpointInterval, graph.vertices.sparkContext)
> messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
> var activeMessages = messages.count() // The first time use newEdges
> ...
> while (activeMessages > 0 && i < maxIterations) {
> // Receive the messages and update the vertices.
> prevG = g
> g = g.joinVertices(messages)(vprog) // Generate g will depends on newEdges
> ...
> activeMessages = messages.count() // The second action to use newEdges. newEdges should be unpersisted after this instruction.
> {code}
> {code:scala}
> // ReplicatedVertexView.scala
> def upgrade(vertices: VertexRDD[VD], includeSrc: Boolean, includeDst: Boolean): Unit = {
> ...
> val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
> (ePartIter, shippedVertsIter) => ePartIter.map {
> case (pid, edgePartition) =>
> (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
> }
> })
> edges = newEdges // newEdges should be persisted
> hasSrcId = includeSrc
> hasDstId = includeDst
> }
> }
> {code}
> As I don't have much knowledge about Graphx, so I don't know how to fix these issues well.
> This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)
---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org