You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Rohit (JIRA)" <ji...@apache.org> on 2016/11/28 10:54:58 UTC

[jira] [Commented] (SPARK-18568) vertex attributes in the edge triplet not getting updated in super steps for Pregel API

    [ https://issues.apache.org/jira/browse/SPARK-18568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15701622#comment-15701622 ] 

Rohit commented on SPARK-18568:
-------------------------------

Found the exact issue. If the vertex attribute is a complex object with mutable objects the edge triplet does not update the new state once already the vertex attributes are shipped but if the vertex attributes are immutable objects then there is no issue. below is a code for the same. Just changing the mutable hashmap to immutable hashmap solves the issues. ( this is not a fix for the bug, either this limitation should be made aware to the users are the bug needs to be fixed for immutable objects.)
[~ankurd] Any suggestion on what should i look for more specifically to fix the above bug. Thanks

import org.apache.spark.graphx._
import com.alibaba.fastjson.JSONObject
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.log4j.Logger
import org.apache.log4j.Level
import scala.collection.mutable.HashMap


object PregelTest {
  val logger = Logger.getLogger(getClass().getName());
  def run(graph: Graph[HashMap[String, Int], HashMap[String, Int]]): Graph[HashMap[String, Int], HashMap[String, Int]] = {

    def vProg(v: VertexId, attr: HashMap[String, Int], msg: Integer): HashMap[String, Int] = {
      var updatedAttr = attr
      
      if (msg < 0) {
        // init message received 
        if (v.equals(0.asInstanceOf[VertexId])) updatedAttr = attr.+=("LENGTH" -> 0)
        else updatedAttr = attr.+=("LENGTH" -> Integer.MAX_VALUE)
      } else {
        updatedAttr = attr.+=("LENGTH" -> (msg + 1))
      }
      updatedAttr
    }

    def sendMsg(triplet: EdgeTriplet[HashMap[String, Int], HashMap[String, Int]]): Iterator[(VertexId, Integer)] = {
      val len = triplet.srcAttr.get("LENGTH").get
      // send a msg if last hub is reachable 
      if (len < Integer.MAX_VALUE) Iterator((triplet.dstId, len))
      else Iterator.empty
    }

    def mergeMsg(msg1: Integer, msg2: Integer): Integer = {
      if (msg1 < msg2) msg1 else msg2
    }

    Pregel(graph, new Integer(-1), 3, EdgeDirection.Either)(vProg, sendMsg, mergeMsg)
  }

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)
    val conf = new SparkConf().setAppName("Pregel Test")
    conf.set("spark.master", "local")
    val sc = new SparkContext(conf)
    val test = new HashMap[String, Int]

    // create a simplest test graph with 3 nodes and 2 edges 
    val vertexList = Array(
      (0.asInstanceOf[VertexId], new HashMap[String, Int]),
      (1.asInstanceOf[VertexId], new HashMap[String, Int]),
      (2.asInstanceOf[VertexId], new HashMap[String, Int]))
    val edgeList = Array(
      Edge(0.asInstanceOf[VertexId], 1.asInstanceOf[VertexId], new HashMap[String, Int]),
      Edge(1.asInstanceOf[VertexId], 2.asInstanceOf[VertexId], new HashMap[String, Int]))

    val vertexRdd = sc.parallelize(vertexList)
    val edgeRdd = sc.parallelize(edgeList)
    val g = Graph[HashMap[String, Int], HashMap[String, Int]](vertexRdd, edgeRdd)

    // run test code 
    val lpa = run(g)
    lpa.vertices.collect().map(println)
  }
}

> vertex attributes in the edge triplet not getting updated in super steps for Pregel API
> ---------------------------------------------------------------------------------------
>
>                 Key: SPARK-18568
>                 URL: https://issues.apache.org/jira/browse/SPARK-18568
>             Project: Spark
>          Issue Type: Bug
>          Components: GraphX
>    Affects Versions: 2.0.2
>            Reporter: Rohit
>
> When running the Pregel API with vertex attribute as complex objects. The vertex attributes are not getting updated in the triplet view. For example if the vertex attributes changes in first superstep for vertex"a" the triplet src attributes in the send msg program for the first super step gets the latest attributes of the vertex "a" but on 2nd super step if the vertex attributes changes in the vprog the edge triplets are not updated with this new state of the vertex for all the edge triplets having the vertex "a" as src or destination. if I re-create the graph using g = Graph(g.vertices, g.edges) in the while loop before the next super step then its getting updated. But this fix is not good performance wise. A detailed description of the bug along with the code to recreate it is in the attached URL.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org