You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@spark.apache.org by Takeshi Yamamuro <li...@gmail.com> on 2014/12/06 11:40:42 UTC
Re: a question of Graph build api
Hi,
Yes, I think so.
However, EdgePartitionBuilder might have edge lists as other types instead
of Edge in a future.
This is because the change can make fast graph construction.
See also SPARK-1987.
https://issues.apache.org/jira/browse/SPARK-1987
Thanks,
takeshi
On Fri, Dec 5, 2014 at 12:12 AM, jinkui.sjk <ji...@alibaba-inc.com>
wrote:
> hi, all
>
> where build a graph from edge tuples with api Graph.fromEdgeTuples,
> the edges object type is RDD[Edge], inside of EdgeRDD.fromEdge,
> EdgePartitionBuilder.add func’s param is better to be Edge object.
> no need to create a new Edge object again.
>
>
>
> def fromEdgeTuples[VD: ClassTag](
> rawEdges: RDD[(VertexId, VertexId)],
> defaultValue: VD,
> uniqueEdges: Option[PartitionStrategy] = None,
> edgeStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY,
> vertexStorageLevel: StorageLevel = StorageLevel.MEMORY_ONLY):
> Graph[VD, Int] =
> {
> * val edges = rawEdges.map(p => Edge(p._1, p._2, 1))*
> val graph = GraphImpl(edges, defaultValue, edgeStorageLevel,
> vertexStorageLevel)
> uniqueEdges match {
> case Some(p) => graph.partitionBy(p).groupEdges((a, b) => a + b)
> case None => graph
> }
> }
>
>
>
>
> object GraphImpl {
>
> /** Create a graph from edges, setting referenced vertices to
> `defaultVertexAttr`. */
> def apply[VD: ClassTag, ED: ClassTag](
> edges: RDD[Edge[ED]],
> defaultVertexAttr: VD,
> edgeStorageLevel: StorageLevel,
> vertexStorageLevel: StorageLevel): GraphImpl[VD, ED] = {
> fromEdgeRDD(*EdgeRDD.fromEdges(edges)*, defaultVertexAttr,
> edgeStorageLevel, vertexStorageLevel)
> }
>
>
>
> object EdgeRDD {
> /**
> * Creates an EdgeRDD from a set of edges.
> *
> * @tparam ED the edge attribute type
> * @tparam VD the type of the vertex attributes that may be joined with
> the returned EdgeRDD
> */
> def fromEdges[ED: ClassTag, VD: ClassTag](edges: RDD[Edge[ED]]):
> EdgeRDD[ED, VD] = {
> val edgePartitions = edges.mapPartitionsWithIndex { (pid, iter) =>
> val builder = new EdgePartitionBuilder[ED, VD]
> iter.foreach { e =>
> * builder.add(e.srcId, e.dstId, e.attr)*
> }
> Iterator((pid, builder.toEdgePartition))
> }
> EdgeRDD.fromEdgePartitions(edgePartitions)
> }
>
>
>