You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "JJ Zhang (JIRA)" <ji...@apache.org> on 2015/08/28 20:42:46 UTC

[jira] [Commented] (SPARK-10335) GraphX Connected Components fail with large number of iterations

    [ https://issues.apache.org/jira/browse/SPARK-10335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14720385#comment-14720385 ] 

JJ Zhang commented on SPARK-10335:
----------------------------------

Note that I've changed the return signature of the algorithm to lower the amount of data that need to be checkpointed. You can always join back to graph to get additional info
Also changed pregel signature to return number of iterations. I think this may be a good idea for the general pregel API as well, since this is a general issue for all iterative graph algorithms.
For simple test, I used code below:
{code}
import org.apache.spark.graphx._
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
/**
 * @author jj.zhang
 */
object SampleGraph extends java.io.Serializable {
  
  /**
   * Create a graph with some long chains of connection to test ConnectedComponents
   */
  def createGraph(sc : SparkContext) : Graph[VertexId, String] = {
    val v :RDD[(VertexId, VertexId)] = sc.parallelize(0 until 50, 10).map(r => (r.toLong, r.toLong))
    
    val v2 :RDD[(VertexId, VertexId)] = sc.parallelize(1001 until 1020, 10).map(r => (r.toLong, r.toLong))
    
    val v3 :RDD[(VertexId, VertexId)] = sc.parallelize(2000 until 2200, 10).map(r => (r.toLong, r.toLong))
    
    val e : RDD[Edge[String]] = sc.parallelize(0 until  49, 10).map(r => Edge(r, r + 1, ""))
    
    val e2 : RDD[Edge[String]] = sc.parallelize(1001 until 1019, 10).map(r => Edge(r, r + 1, ""))
    
    val e3 : RDD[Edge[String]] = sc.parallelize(2000 until 2199, 10).map(r => Edge(r, r + 1, ""))
    
    
    val g : Graph[VertexId, String] = Graph(v ++ v2 ++ v3, e ++ e2 ++ e3) 
    g.partitionBy(PartitionStrategy.EdgePartition2D)
  }
}

val g = SampleGraph.createGraph(sc)
val dir = "hdfs://${myhost}/user/me/run1"
val (g2, count) = RobustConnectedComponents.run(g, 20, dir)
{code}

You should see converge in 199 iterations, and all intermediate files on your hdfs.

> GraphX Connected Components fail with large number of iterations
> ----------------------------------------------------------------
>
>                 Key: SPARK-10335
>                 URL: https://issues.apache.org/jira/browse/SPARK-10335
>             Project: Spark
>          Issue Type: Improvement
>          Components: GraphX
>    Affects Versions: 1.4.0
>         Environment: Tested with Yarn client mode
>            Reporter: JJ Zhang
>
> For graphs with long chains of connected vertices, the algorithm fails in practice to converge.
> The driver always runs out of memory prior to final convergence. In my test, for 1.2B vertices/1.2B edges with the longest path requiring more than 90 iterations, it invariably fails around iteration 80 with driver memory set at 50G. On top of that, each iteration takes longer than previous, and it took an overnight run on 50 node cluster to the failure point.
> It is presumably due to keeping track of the RDD lineage and the DAG computation. So truncate RDD lineage is the most straight-forward solution.
> Tried using checkpoint, but apparently it is not working as expected in version 1.4.0. Will file another ticket for that issue, but hopefully it has been resolved by 1.5.
> Proposed solution below. This was tested and able to converge after 99 iterations on the same graph mentioned above, in less than an hour.
> {code: title=RobustConnectedComponents.scala|borderStyle=solid}
> import org.apache.spark.Logging
> import scala.reflect.ClassTag
> import org.apache.spark.graphx._
> object RobustConnectedComponents extends Logging with java.io.Serializable {
>   def run[VD: ClassTag, ED: ClassTag](graph: Graph[VD, ED], interval: Int = 50, dir : String): (Graph[VertexId, String], Int) = {
>     val ccGraph = graph.mapVertices { case (vid, _) => vid }.mapEdges { x =>""}
>     def sendMessage(edge: EdgeTriplet[VertexId, String]): Iterator[(VertexId, Long)] = {
>       if (edge.srcAttr < edge.dstAttr) {
>         Iterator((edge.dstId, edge.srcAttr))
>       } else if (edge.srcAttr > edge.dstAttr) {
>         Iterator((edge.srcId, edge.dstAttr))
>       } else {
>         Iterator.empty
>       }
>     }
>     val initialMessage = Long.MaxValue
>    
>     
>     var g: Graph[VertexId, String] = ccGraph
>     var i = interval
>     var count = 0
>     while (i == interval) {
>       g = refreshGraph(g, dir, count)
>       g.cache()
>       val (g1, i1) = pregel(g, initialMessage, interval, activeDirection = EdgeDirection.Either)(
>         vprog = (id, attr, msg) => math.min(attr, msg),
>         sendMsg = sendMessage,
>         mergeMsg = (a, b) => math.min(a, b))
>       g.unpersist()
>       g = g1
>       i = i1
>       count = count + i
>       logInfo("Checkpoint reached. iteration so far: " + count)
>     }
>     logInfo("Final Converge: Total Iteration:" + count)
>     (g, count)
>   } // end of connectedComponents
>   
>   def refreshGraph(g : Graph[VertexId, String], dir:String, count:Int): Graph[VertexId, String] = {
>     val vertFile = dir + "/iter-" + count + "/vertices"
>     val edgeFile = dir + "/iter-" + count + "/edges"
>     g.vertices.saveAsObjectFile(vertFile)
>     g.edges.saveAsObjectFile(edgeFile)
>     
>     //load back
>     val v : RDD[(VertexId, VertexId)] = g.vertices.sparkContext.objectFile(vertFile)
>     val e : RDD[Edge[String]]= g.vertices.sparkContext.objectFile(edgeFile)
>     
>     val newGraph = Graph(v, e)
>     newGraph
>   }
>   def pregel[VD: ClassTag, ED: ClassTag, A: ClassTag](graph: Graph[VD, ED],
>     initialMsg: A,
>     maxIterations: Int = Int.MaxValue,
>     activeDirection: EdgeDirection = EdgeDirection.Either)(vprog: (VertexId, VD, A) => VD,
>       sendMsg: EdgeTriplet[VD, ED] => Iterator[(VertexId, A)],
>       mergeMsg: (A, A) => A): (Graph[VD, ED], Int) =
>     {
>       var g = graph.mapVertices((vid, vdata) => vprog(vid, vdata,  initialMsg)).cache()
>       // compute the messages
>       var messages = g.mapReduceTriplets(sendMsg, mergeMsg)
>       var activeMessages = messages.count()
>       // Loop
>       var prevG: Graph[VD, ED] = null
>       var i = 0
>       while (activeMessages > 0 && i < maxIterations) {
>         // Receive the messages and update the vertices.
>         prevG = g
>         g = g.joinVertices(messages)(vprog).cache()
>         val oldMessages = messages
>         // Send new messages, skipping edges where neither side received a message. We must cache
>         // messages so it can be materialized on the next line, allowing us to uncache the previous
>         // iteration.
>         messages = g.mapReduceTriplets(
>           sendMsg, mergeMsg, Some((oldMessages, activeDirection))).cache()
>         // The call to count() materializes `messages` and the vertices of `g`. This hides oldMessages
>         // (depended on by the vertices of g) and the vertices of prevG (depended on by oldMessages
>         // and the vertices of g).
>         activeMessages = messages.count()
>         // Unpersist the RDDs hidden by newly-materialized RDDs
>         oldMessages.unpersist(blocking = false)
>         prevG.unpersistVertices(blocking = false)
>         prevG.edges.unpersist(blocking = false)
>         // count the iteration
>         i += 1
>       }
>       (g, i)
>     } // end of apply
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org