You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by fuz_woo <fu...@qq.com> on 2016/11/18 03:47:14 UTC

GraphX Pregel not update vertex state properly, cause messages loss

hi,everyone, I encountered a strange problem these days when i'm attempting
to use the GraphX Pregel interface to implement a simple
single-source-shortest-path algorithm.
below is my code:

import com.alibaba.fastjson.JSONObject
import org.apache.spark.graphx._

import org.apache.spark.{SparkConf, SparkContext}

object PregelTest {

  def run(graph: Graph[JSONObject, JSONObject]): Graph[JSONObject,
JSONObject] = {

    def vProg(v: VertexId, attr: JSONObject, msg: Integer): JSONObject = {
      if ( msg < 0 ) {
        // init message received
        if ( v.equals(0.asInstanceOf[VertexId]) ) attr.put("LENGTH", 0)
        else attr.put("LENGTH", Integer.MAX_VALUE)
      } else {
        attr.put("LENGTH", msg+1)
      }
      attr
    }

    def sendMsg(triplet: EdgeTriplet[JSONObject, JSONObject]):
Iterator[(VertexId, Integer)] = {
      val len = triplet.srcAttr.getInteger("LENGTH")
      // send a msg if last hub is reachable
      if ( len<Integer.MAX_VALUE ) Iterator((triplet.dstId, len))
      else Iterator.empty
    }

    def mergeMsg(msg1: Integer, msg2: Integer): Integer = {
      if ( msg1 &lt; msg2 ) msg1 else msg2
    }

    Pregel(graph, new Integer(-1), 3, EdgeDirection.Out)(vProg, sendMsg,
mergeMsg)
  }

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(&quot;Pregel Test&quot;)
    conf.set(&quot;spark.master&quot;, &quot;local&quot;)
    val sc = new SparkContext(conf)

    // create a simplest test graph with 3 nodes and 2 edges
    val vertexList = Array(
      (0.asInstanceOf[VertexId], new JSONObject()),
      (1.asInstanceOf[VertexId], new JSONObject()),
      (2.asInstanceOf[VertexId], new JSONObject()))
    val edgeList = Array(
      Edge(0.asInstanceOf[VertexId], 1.asInstanceOf[VertexId], new
JSONObject()),
      Edge(1.asInstanceOf[VertexId], 2.asInstanceOf[VertexId], new
JSONObject()))

    val vertexRdd = sc.parallelize(vertexList)
    val edgeRdd = sc.parallelize(edgeList)
    val g = Graph[JSONObject, JSONObject](vertexRdd, edgeRdd)

    // run test code
    val lpa = run(g)
    lpa
  }
}

and after i run the code, I got a incorrect result in which the vertex 2 has
a &quot;LENGTH&quot; label valued &lt;Integer.MAX_VALUE>, it seems that the
messages sent to vertex 2 was lost unexpectedly. I then tracked the debugger
to file Pregel.scala,  where I saw the code:

<http://apache-spark-user-list.1001560.n3.nabble.com/file/n28100/%E7%B2%98%E8%B4%B4%E5%9B%BE%E7%89%87.png> 
 
In the first iteration 0, the variable messages in line 138 is reconstructed
, and then recomputed in line 143, in where activeMessages got a value 0,
which means the messages is lost.
then I set a breakpoint in line 138, and before its execution I execute an
expression " g.triplets().collect() " which just collects the updated graph
data. after I done this and execute the rest code, the messages is no longer
empty and activeMessages got value 1 as expected.  

I have tested the code with both spark&&graphx 1.4 and 1.6 in scala 2.10,
and got the same result.

I must say this problem makes me really confused, I've spent almost 2 weeks
to resolve it and I have no idea how to do it now. If this is not a bug, I
totally can't understand why just executing a non-disturb expression (
g.triplets().collect(), it just collect the data and do noting computing )
could changing the essential, it's really ridiculous.



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Pregel-not-update-vertex-state-properly-cause-messages-loss-tp28100.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: GraphX Pregel not update vertex state properly, cause messages loss

Posted by 吴 郎 <fu...@qq.com>.
Thank you, Dale, I've realized in what situation this bug would be activated. Actually, it seems that any user-defined class with dynamic fields (such Map, List...) could not be used as message, or it'll lost in the next supersteps. to figure this out, I tried to deep-copy an new message object everytime the vertex program runs, and it works till now, though it's obviously not an elegant way. 

fuz woo
 
------------------
致好!
吴   郎

-----------------------------------------------------------
国防科大计算机学院

湖南省长沙市开福区 邮编:410073
Email: fuz.woo@qq.com






 




------------------ Original ------------------
From: "Dale Wang"<w....@gmail.com>; 
Date: 2016年11月24日(星期四) 中午11:10
To: "吴 郎"<fu...@qq.com>; 
Cc: "user"<us...@spark.apache.org>; 
Subject: Re: GraphX Pregel not update vertex state properly, cause messages loss




The problem comes from the inconsistency between graph’s triplet view  and vertex view. The message may not be lost but the message is just not  sent in sendMsgfunction because sendMsg function gets wrong value  of srcAttr! 
 
 It is not a new bug. I met a similar bug that appeared in version 1.2.1  according to  JIAR-6378 before. I  can reproduce that inconsistency bug with a small and simple program  (See that JIRA issue for more details). It seems that in some situation  the triplet view of a Graph object does not update consistently with  vertex view. The GraphX Pregel API heavily relies on  mapReduceTriplets(old)/aggregateMessages(new) API who heavily relies  on the correct behavior of the triplet view of a graph. Thus this bug  influences on behavior of Pregel API.
 
 Though I cannot figure out why the bug appears either, but I suspect  that the bug has some connection with the data type of the vertex  property. If you use primitive types such as Double and Long, it is  OK. But if you use some self-defined type with mutable fields such as  mutable Map and mutable ArrayBuffer, the bug appears. In your case I  notice that you use JSONObject as your vertex’s data type. After  looking up the definition ofJSONObject, JSONObject has a java map as  its field to store data which is mutable. To temporarily avoid the bug,  you can modify the data type of your vertex property to avoid any  mutable data type by replacing mutable data collection to immutable data  collection provided by Scala and replacing var field to val field.  At least, that suggestion works for me.
 
 Zhaokang Wang
 ​



2016-11-18 11:47 GMT+08:00 fuz_woo <fu...@qq.com>:
hi,everyone, I encountered a strange problem these days when i'm attempting
 to use the GraphX Pregel interface to implement a simple
 single-source-shortest-path algorithm.
 below is my code:
 
 import com.alibaba.fastjson.JSONObject
 import org.apache.spark.graphx._
 
 import org.apache.spark.{SparkConf, SparkContext}
 
 object PregelTest {
 
   def run(graph: Graph[JSONObject, JSONObject]): Graph[JSONObject,
 JSONObject] = {
 
     def vProg(v: VertexId, attr: JSONObject, msg: Integer): JSONObject = {
       if ( msg < 0 ) {
         // init message received
         if ( v.equals(0.asInstanceOf[VertexId]) ) attr.put("LENGTH", 0)
         else attr.put("LENGTH", Integer.MAX_VALUE)
       } else {
         attr.put("LENGTH", msg+1)
       }
       attr
     }
 
     def sendMsg(triplet: EdgeTriplet[JSONObject, JSONObject]):
 Iterator[(VertexId, Integer)] = {
       val len = triplet.srcAttr.getInteger("LENGTH")
       // send a msg if last hub is reachable
       if ( len<Integer.MAX_VALUE ) Iterator((triplet.dstId, len))
       else Iterator.empty
     }
 
     def mergeMsg(msg1: Integer, msg2: Integer): Integer = {
       if ( msg1 &lt; msg2 ) msg1 else msg2
     }
 
     Pregel(graph, new Integer(-1), 3, EdgeDirection.Out)(vProg, sendMsg,
 mergeMsg)
   }
 
   def main(args: Array[String]): Unit = {
     val conf = new SparkConf().setAppName(&quot;Pregel Test&quot;)
     conf.set(&quot;spark.master&quot;, &quot;local&quot;)
     val sc = new SparkContext(conf)
 
     // create a simplest test graph with 3 nodes and 2 edges
     val vertexList = Array(
       (0.asInstanceOf[VertexId], new JSONObject()),
       (1.asInstanceOf[VertexId], new JSONObject()),
       (2.asInstanceOf[VertexId], new JSONObject()))
     val edgeList = Array(
       Edge(0.asInstanceOf[VertexId], 1.asInstanceOf[VertexId], new
 JSONObject()),
       Edge(1.asInstanceOf[VertexId], 2.asInstanceOf[VertexId], new
 JSONObject()))
 
     val vertexRdd = sc.parallelize(vertexList)
     val edgeRdd = sc.parallelize(edgeList)
     val g = Graph[JSONObject, JSONObject](vertexRdd, edgeRdd)
 
     // run test code
     val lpa = run(g)
     lpa
   }
 }
 
 and after i run the code, I got a incorrect result in which the vertex 2 has
 a &quot;LENGTH&quot; label valued &lt;Integer.MAX_VALUE>, it seems that the
 messages sent to vertex 2 was lost unexpectedly. I then tracked the debugger
 to file Pregel.scala,  where I saw the code:
 
 <http://apache-spark-user-list.1001560.n3.nabble.com/file/n28100/%E7%B2%98%E8%B4%B4%E5%9B%BE%E7%89%87.png>
 
 In the first iteration 0, the variable messages in line 138 is reconstructed
 , and then recomputed in line 143, in where activeMessages got a value 0,
 which means the messages is lost.
 then I set a breakpoint in line 138, and before its execution I execute an
 expression " g.triplets().collect() " which just collects the updated graph
 data. after I done this and execute the rest code, the messages is no longer
 empty and activeMessages got value 1 as expected.
 
 I have tested the code with both spark&&graphx 1.4 and 1.6 in scala 2.10,
 and got the same result.
 
 I must say this problem makes me really confused, I've spent almost 2 weeks
 to resolve it and I have no idea how to do it now. If this is not a bug, I
 totally can't understand why just executing a non-disturb expression (
 g.triplets().collect(), it just collect the data and do noting computing )
 could changing the essential, it's really ridiculous.
 
 
 
 --
 View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Pregel-not-update-vertex-state-properly-cause-messages-loss-tp28100.html
 Sent from the Apache Spark User List mailing list archive at Nabble.com.
 
 ---------------------------------------------------------------------
 To unsubscribe e-mail: user-unsubscribe@spark.apache.org

Re: GraphX Pregel not update vertex state properly, cause messages loss

Posted by Dale Wang <w....@gmail.com>.
The problem comes from the inconsistency between graph’s triplet view and
vertex view. The message may not be lost but the message is just not sent
in sendMsgfunction because sendMsg function gets wrong value of srcAttr!

It is not a new bug. I met a similar bug that appeared in version 1.2.1
according to JIAR-6378 <https://issues.apache.org/jira/browse/SPARK-6378>
before. I can reproduce that inconsistency bug with a small and simple
program (See that JIRA issue for more details). It seems that in some
situation the triplet view of a Graph object does not update consistently
with vertex view. The GraphX Pregel API heavily relies on
mapReduceTriplets(old)/aggregateMessages(new) API who heavily relies on the
correct behavior of the triplet view of a graph. Thus this bug influences
on behavior of Pregel API.

Though I cannot figure out why the bug appears either, but I suspect that
the bug has some connection with the data type of the vertex property. If
you use *primitive* types such as Double and Long, it is OK. But if you use
some self-defined type with mutable fields such as mutable Map and mutable
ArrayBuffer, the bug appears. In your case I notice that you use JSONObject
as your vertex’s data type. After looking up the definition ofJSONObject,
JSONObject has a java map as its field to store data which is mutable. To
temporarily avoid the bug, you can modify the data type of your vertex
property to avoid any mutable data type by replacing mutable data
collection to immutable data collection provided by Scala and replacing var
field to val field. At least, that suggestion works for me.

Zhaokang Wang
​

2016-11-18 11:47 GMT+08:00 fuz_woo <fu...@qq.com>:

> hi,everyone, I encountered a strange problem these days when i'm attempting
> to use the GraphX Pregel interface to implement a simple
> single-source-shortest-path algorithm.
> below is my code:
>
> import com.alibaba.fastjson.JSONObject
> import org.apache.spark.graphx._
>
> import org.apache.spark.{SparkConf, SparkContext}
>
> object PregelTest {
>
>   def run(graph: Graph[JSONObject, JSONObject]): Graph[JSONObject,
> JSONObject] = {
>
>     def vProg(v: VertexId, attr: JSONObject, msg: Integer): JSONObject = {
>       if ( msg < 0 ) {
>         // init message received
>         if ( v.equals(0.asInstanceOf[VertexId]) ) attr.put("LENGTH", 0)
>         else attr.put("LENGTH", Integer.MAX_VALUE)
>       } else {
>         attr.put("LENGTH", msg+1)
>       }
>       attr
>     }
>
>     def sendMsg(triplet: EdgeTriplet[JSONObject, JSONObject]):
> Iterator[(VertexId, Integer)] = {
>       val len = triplet.srcAttr.getInteger("LENGTH")
>       // send a msg if last hub is reachable
>       if ( len<Integer.MAX_VALUE ) Iterator((triplet.dstId, len))
>       else Iterator.empty
>     }
>
>     def mergeMsg(msg1: Integer, msg2: Integer): Integer = {
>       if ( msg1 &lt; msg2 ) msg1 else msg2
>     }
>
>     Pregel(graph, new Integer(-1), 3, EdgeDirection.Out)(vProg, sendMsg,
> mergeMsg)
>   }
>
>   def main(args: Array[String]): Unit = {
>     val conf = new SparkConf().setAppName(&quot;Pregel Test&quot;)
>     conf.set(&quot;spark.master&quot;, &quot;local&quot;)
>     val sc = new SparkContext(conf)
>
>     // create a simplest test graph with 3 nodes and 2 edges
>     val vertexList = Array(
>       (0.asInstanceOf[VertexId], new JSONObject()),
>       (1.asInstanceOf[VertexId], new JSONObject()),
>       (2.asInstanceOf[VertexId], new JSONObject()))
>     val edgeList = Array(
>       Edge(0.asInstanceOf[VertexId], 1.asInstanceOf[VertexId], new
> JSONObject()),
>       Edge(1.asInstanceOf[VertexId], 2.asInstanceOf[VertexId], new
> JSONObject()))
>
>     val vertexRdd = sc.parallelize(vertexList)
>     val edgeRdd = sc.parallelize(edgeList)
>     val g = Graph[JSONObject, JSONObject](vertexRdd, edgeRdd)
>
>     // run test code
>     val lpa = run(g)
>     lpa
>   }
> }
>
> and after i run the code, I got a incorrect result in which the vertex 2
> has
> a &quot;LENGTH&quot; label valued &lt;Integer.MAX_VALUE>, it seems that the
> messages sent to vertex 2 was lost unexpectedly. I then tracked the
> debugger
> to file Pregel.scala,  where I saw the code:
>
> <http://apache-spark-user-list.1001560.n3.nabble.com/
> file/n28100/%E7%B2%98%E8%B4%B4%E5%9B%BE%E7%89%87.png>
>
> In the first iteration 0, the variable messages in line 138 is
> reconstructed
> , and then recomputed in line 143, in where activeMessages got a value 0,
> which means the messages is lost.
> then I set a breakpoint in line 138, and before its execution I execute an
> expression " g.triplets().collect() " which just collects the updated graph
> data. after I done this and execute the rest code, the messages is no
> longer
> empty and activeMessages got value 1 as expected.
>
> I have tested the code with both spark&&graphx 1.4 and 1.6 in scala 2.10,
> and got the same result.
>
> I must say this problem makes me really confused, I've spent almost 2 weeks
> to resolve it and I have no idea how to do it now. If this is not a bug, I
> totally can't understand why just executing a non-disturb expression (
> g.triplets().collect(), it just collect the data and do noting computing )
> could changing the essential, it's really ridiculous.
>
>
>
> --
> View this message in context: http://apache-spark-user-list.
> 1001560.n3.nabble.com/GraphX-Pregel-not-update-vertex-
> state-properly-cause-messages-loss-tp28100.html
> Sent from the Apache Spark User List mailing list archive at Nabble.com.
>
> ---------------------------------------------------------------------
> To unsubscribe e-mail: user-unsubscribe@spark.apache.org
>
>

Re: GraphX Pregel not update vertex state properly, cause messages loss

Posted by rohit13k <ro...@gmail.com>.
Found the exact issue. If the vertex attribute is a complex object with
mutable objects the edge triplet does not update the new state once already
the vertex attributes are shipped but if the vertex attributes are immutable
objects then there is no issue. below is a code for the same. Just changing
the mutable hashmap to immutable hashmap solves the issues. ( this is not a
fix for the bug, either this limitation should be made aware of the users
are the bug needs to be fixed for immutable objects.)

import org.apache.spark.graphx._
import com.alibaba.fastjson.JSONObject
import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.log4j.Logger
import org.apache.log4j.Level
import scala.collection.mutable.HashMap


object PregelTest {
  val logger = Logger.getLogger(getClass().getName());
  def run(graph: Graph[HashMap[String, Int], HashMap[String, Int]]):
Graph[HashMap[String, Int], HashMap[String, Int]] = {

    def vProg(v: VertexId, attr: HashMap[String, Int], msg: Integer):
HashMap[String, Int] = {
      var updatedAttr = attr
      
      if (msg < 0) {
        // init message received 
        if (v.equals(0.asInstanceOf[VertexId])) updatedAttr =
attr.+=("LENGTH" -> 0)
        else updatedAttr = attr.+=("LENGTH" -> Integer.MAX_VALUE)
      } else {
        updatedAttr = attr.+=("LENGTH" -> (msg + 1))
      }
      updatedAttr
    }

    def sendMsg(triplet: EdgeTriplet[HashMap[String, Int], HashMap[String,
Int]]): Iterator[(VertexId, Integer)] = {
      val len = triplet.srcAttr.get("LENGTH").get
      // send a msg if last hub is reachable 
      if (len < Integer.MAX_VALUE) Iterator((triplet.dstId, len))
      else Iterator.empty
    }

    def mergeMsg(msg1: Integer, msg2: Integer): Integer = {
      if (msg1 < msg2) msg1 else msg2
    }

    Pregel(graph, new Integer(-1), 3, EdgeDirection.Either)(vProg, sendMsg,
mergeMsg)
  }

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.OFF)
    Logger.getLogger("akka").setLevel(Level.OFF)
    val conf = new SparkConf().setAppName("Pregel Test")
    conf.set("spark.master", "local")
    val sc = new SparkContext(conf)
    val test = new HashMap[String, Int]

    // create a simplest test graph with 3 nodes and 2 edges 
    val vertexList = Array(
      (0.asInstanceOf[VertexId], new HashMap[String, Int]),
      (1.asInstanceOf[VertexId], new HashMap[String, Int]),
      (2.asInstanceOf[VertexId], new HashMap[String, Int]))
    val edgeList = Array(
      Edge(0.asInstanceOf[VertexId], 1.asInstanceOf[VertexId], new
HashMap[String, Int]),
      Edge(1.asInstanceOf[VertexId], 2.asInstanceOf[VertexId], new
HashMap[String, Int]))

    val vertexRdd = sc.parallelize(vertexList)
    val edgeRdd = sc.parallelize(edgeList)
    val g = Graph[HashMap[String, Int], HashMap[String, Int]](vertexRdd,
edgeRdd)

    // run test code 
    val lpa = run(g)
    lpa.vertices.collect().map(println)
  }
}



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Pregel-not-update-vertex-state-properly-cause-messages-loss-tp28100p28139.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: GraphX Pregel not update vertex state properly, cause messages loss

Posted by rohit13k <ro...@gmail.com>.
Created a JIRA for the same

https://issues.apache.org/jira/browse/SPARK-18568



--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Pregel-not-update-vertex-state-properly-cause-messages-loss-tp28100p28124.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org


Re: GraphX Pregel not update vertex state properly, cause messages loss

Posted by rohit13k <ro...@gmail.com>.
Hi 

I am facing a similar issue. It's not that the message is getting lost or
something. The vertex 1 attributes changes in super step 1 but when the
sendMsg gets the vertex attribute from the edge triplet in the 2nd superstep
it stills has the old value of vertex 1 and not the latest value. So as per
your code no new msg will be generated in the superstep. I think the bug is
in the replicatedVertexView where the srcAttr and dstAttr of the
edgeTripplet is updated from the latest version of the vertex after each
superstep.

How to get this bug raised? I am struggling to find an exact solution for it
except for recreating the graph after every superstep to reinforce edge
triplets to have the latest value of the vertex. but this is not a good
solution performance wise.





--
View this message in context: http://apache-spark-user-list.1001560.n3.nabble.com/GraphX-Pregel-not-update-vertex-state-properly-cause-messages-loss-tp28100p28123.html
Sent from the Apache Spark User List mailing list archive at Nabble.com.

---------------------------------------------------------------------
To unsubscribe e-mail: user-unsubscribe@spark.apache.org