You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Apache Spark (JIRA)" <ji...@apache.org> on 2015/01/04 09:09:34 UTC

[jira] [Commented] (SPARK-5036) Better support sending partial messages in Pregel API

    [ https://issues.apache.org/jira/browse/SPARK-5036?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14263786#comment-14263786 ] 

Apache Spark commented on SPARK-5036:
-------------------------------------

User 'shijinkui' has created a pull request for this issue:
https://github.com/apache/spark/pull/3866

> Better support sending partial messages in Pregel API
> -----------------------------------------------------
>
>                 Key: SPARK-5036
>                 URL: https://issues.apache.org/jira/browse/SPARK-5036
>             Project: Spark
>          Issue Type: Improvement
>          Components: GraphX
>            Reporter: sjk
>         Attachments: s1.jpeg, s2.jpeg
>
>
> Better support sending partial messages in Pregel API
> 1. the reqirement
> In many iterative graph algorithms, only a part of the vertexes (we call them ActiveVertexes) need to send messages to their neighbours in each iteration. In many cases, ActiveVertexes are the vertexes that their attributes do not change between the previous and current iteration. To implement this requirement, we can use Pregel API + a flag (e.g., `bool isAttrChanged`) in each vertex's attribute. 
> However, after `aggregateMessage` or `mapReduceTriplets` of each iteration, we need to reset this flag to the init value in every vertex, which needs a heavy `joinVertices`. 
> We find a more efficient way to meet this requirement and want to discuss it here.
> Look at a simple example as follows:
> In i-th iteartion, the previous attribute of each vertex is `Attr` and the newly computed attribute is `NewAttr`:
> |VID| Attr| NewAttr| Neighbours|
> |:----|:-----|:----|:------|
> | 1 | 4| 5| 2, 3 |
> | 2 | 3| 2| 1, 4 |
> | 3 | 2| 2| 1, 4 |
> | 4|  3| 4| 1, 2, 3 |
> Our requirement is that: 
> 1.	Set each vertex's `Attr` to be `NewAttr` in i-th iteration    
> 2.	For each vertex whose `Attr!=NewAttr`, send message to its neighbours in the next iteration's `aggregateMessage`.
> We found it is hard to implement this requirment using current Pregel API efficiently. The reason is that we not only need to perform `pregel()` to  compute the `NewAttr`  (2) but also need to perform `outJoin()` to satisfy (1).
> A simple idea is to keep a `isAttrChanged:Boolean` (solution 1)  or `flag:Int` (solution 2) in each vertex's attribute.
>  2. two solution  
> -----------
> 2.1 solution 1: label and reset `isAttrChanged:Boolean` of Vertex Attr
> ![alt text](s1.jpeg "Title")
> 1. init message by `aggregateMessage`
> 	it return a messageRDD
> 2. `innerJoin`
> 	compute the messages on the received vertex, return a new VertexRDD which have the computed value by customed logic function `vprog`, set `isAttrChanged = true`
> 3. `outerJoinVertices`
> 	update the changed vertex to the whole graph. now the graph is new.
> 4. `aggregateMessage`. it return a messageRDD
> 5. `joinVertices`  reset erery `isAttrChanged` of Vertex attr to false
> 	```
> 	//	here reset the isAttrChanged to false
> 	g = updateG.joinVertices(updateG.vertices) {
>       	(vid, oriVertex, updateGVertex) => updateGVertex.reset()
>     	}
>    ```
>    here need to reset the vertex attribute object's variable as false
> if don't reset the `isAttrChanged`, it will send message next iteration directly.
> **result:**  
> *	Edge: 890041895 
> *	Vertex: 181640208
> *	Iterate: 150 times
> *	Cost total: 8.4h
> *	can't run until the 0 message 
> solution 2. color vertex
> ![alt text](s2.jpeg "Title")
> iterate process:
> 1. innerJoin 
>   `vprog` using as a partial function, looks like `vprog(curIter, _: VertexId, _: VD, _: A)`
>   ` i = i + 1; val curIter = i`. 
>   in `vprog`, user can fetch `curIter` and assign to `falg`.
> 2. outerJoinVertices
> 	`graph = graph.outerJoinVertices(changedVerts) { (vid, old, newOpt) => newOpt.getOrElse(old)}.cache()`
> 3. aggregateMessages     
> 	sendMsg is partial function, looks like `sendMsg(curIter, _: EdgeContext[VD, ED, A]`    	
> 	**in `sendMsg`, compare `curIter` with `flag`, determine whether sending message**
> ####	result
> raw data       from
> *	vertex: 181640208
> *	edge: 890041895
> |  | iteration average cost | 150 iteration cost | 420 iteration cost | 
> | ------------ | ------------- | ------------ | ------------ |
> |  solution 1 | 188m | 7.8h | cannot finish  |
> |  solution 2 | 24 | 1.2h   | 3.1h | 
> | compare  | 7x  | 6.5x  | finished in 3.1 |
>     
> ##		the end
>     
> i think the second solution(Pregel + a flag) is better.    
> this can really support the iterative graph algorithms which only part of the vertexes send messages to their neighbours in each iteration.
> we shall use it in product environment.
> pr: https://github.com/apache/spark/pull/3866
> ----EOF----



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org