You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Dong Wang (Jira)" <ji...@apache.org> on 2019/11/13 12:18:00 UTC

[jira] [Created] (SPARK-29878) Improper cache strategies in GraphX

Dong Wang created SPARK-29878:
---------------------------------

             Summary: Improper cache strategies in GraphX
                 Key: SPARK-29878
                 URL: https://issues.apache.org/jira/browse/SPARK-29878
             Project: Spark
          Issue Type: Improvement
          Components: GraphX
    Affects Versions: 3.0.0
            Reporter: Dong Wang


I have run examples.graphx.SSPExample and looked through the RDD dependency graphs as well as persist operations. There are some improper cache strategies in GraphX. The same situations also exist when I run ConnectedComponentsExample.

1.  vertices.cache() and newEdges.cache() are unnecessary
In SSPExample, a graph is initialized by GraphImpl.mapVertices(). In this method, a GraphImpl object is created using GraphImpl.apply(vertices, edges), and RDD vertices/newEdges are cached in apply(). But these two RDDs are not directly used anymore (their children RDDs has been cached) in SSPExample, so the persists can be unnecessary here. 
However, the other examples may need these two persists, so I think they cannot be simply removed. It might be hard to fix.
{code:scala}
  def apply[VD: ClassTag, ED: ClassTag](
      vertices: VertexRDD[VD],
      edges: EdgeRDD[ED]): GraphImpl[VD, ED] = {
    vertices.cache() // It is unnecessary for SSPExample and ConnectedComponentsExample
    // Convert the vertex partitions in edges to the correct type
    val newEdges = edges.asInstanceOf[EdgeRDDImpl[ED, _]]
      .mapEdgePartitions((pid, part) => part.withoutVertexAttributes[VD])
      .cache() // It is unnecessary for SSPExample and ConnectedComponentsExample
    GraphImpl.fromExistingRDDs(vertices, newEdges)
  }
{code}

2. Missing persist on newEdges
SSSPExample will invoke pregel to do execution. Pregel will ultilize ReplicatedVertexView.upgrade(). I find that RDD newEdges will be directly use by multiple actions in Pregel. So newEdges should be persisted.
Same as the above issue, this issue is also found in ConnectedComponentsExample. It is also hard to fix, because the persist added may be unnecessary for other examples.
{code:scala}
// Pregel.scala
    // compute the messages
    var messages = GraphXUtils.mapReduceTriplets(g, sendMsg, mergeMsg) // newEdges is created here
    val messageCheckpointer = new PeriodicRDDCheckpointer[(VertexId, A)](
      checkpointInterval, graph.vertices.sparkContext)
    messageCheckpointer.update(messages.asInstanceOf[RDD[(VertexId, A)]])
    var activeMessages = messages.count() // The first time use newEdges
    ...
    while (activeMessages > 0 && i < maxIterations) {
      // Receive the messages and update the vertices.
      prevG = g
      g = g.joinVertices(messages)(vprog) // Generate g will depends on newEdges
      ...
      activeMessages = messages.count() // The second action to use newEdges. newEdges should be unpersisted after this instruction.
{code}
{code:scala}
// ReplicatedVertexView.scala
  def upgrade(vertices: VertexRDD[VD], includeSrc: Boolean, includeDst: Boolean): Unit = {
      ...
       val newEdges = edges.withPartitionsRDD(edges.partitionsRDD.zipPartitions(shippedVerts) {
        (ePartIter, shippedVertsIter) => ePartIter.map {
          case (pid, edgePartition) =>
            (pid, edgePartition.updateVertices(shippedVertsIter.flatMap(_._2.iterator)))
        }
      })
      edges = newEdges // newEdges should be persisted
      hasSrcId = includeSrc
      hasDstId = includeDst
    }
  }
{code}
As I don't have much knowledge about Graphx, so I don't know how to fix these issues well.

This issue is reported by our tool CacheCheck, which is used to dynamically detecting persist()/unpersist() api misuses.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org