You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "ding (JIRA)" <ji...@apache.org> on 2016/09/13 22:15:20 UTC

[jira] [Commented] (SPARK-17097) Pregel does not keep vertex state properly; fails to terminate

    [ https://issues.apache.org/jira/browse/SPARK-17097?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15488609#comment-15488609 ] 

ding commented on SPARK-17097:
------------------------------

I am afraid the attached sample code fail to terminate with case class is not caused by Pregel or graphx bug. As in the sample code, vertices(inLabels here) is updated inplace which supposed not to happen. In this way, after transform operations, the original vertices is also updated and it has exactly the same value with the new generated vertices in above code when VD is case class. It leads to fail to update EdgeTriplet as there is no difference of the vertices. So EdgeTriplet dstLabels is always empty while srcLabels contains a value. And there is always active message which lead to Pregel not terminate.

One way to fix the problem is remove inplace update in vertexProgram by clone the labels and make update in the new labels. I have tried below code and it works.
    def vertexProgram (vertexId: VertexId, vdata: TestVertex, message: Vector[String]): TestVertex = {
      val labels = vdata.labels.clone() 
      message.foreach {
        case `startString` =>
          if (vdata.id == 0L)
            labels.add(vdata.value )  

        case m =>
          if (!vdata.labels.contains(m))
            labels.add(m)
      }
      new TestVertex(vdata.id, vdata.value, labels)
    }

Hope this information is helpful to you.

> Pregel does not keep vertex state properly; fails to terminate 
> ---------------------------------------------------------------
>
>                 Key: SPARK-17097
>                 URL: https://issues.apache.org/jira/browse/SPARK-17097
>             Project: Spark
>          Issue Type: Bug
>          Components: GraphX
>    Affects Versions: 1.6.0
>         Environment: Scala 2.10.5, Spark 1.6.0 with GraphX and Pregel
>            Reporter: Seth Bromberger
>
> Consider the following minimum example:
> {code:title=PregelBug.scala|borderStyle=solid}
> package testGraph
> import org.apache.spark.{SparkConf, SparkContext}
> import org.apache.spark.graphx.{Edge, EdgeTriplet, Graph, _}
> object PregelBug {
>   def main(args: Array[String]) = {
>     //FIXME breaks if TestVertex is a case class; works if not case class
>     case class TestVertex(inId: VertexId,
>                      inData: String,
>                      inLabels: collection.mutable.HashSet[String]) extends Serializable {
>       val id = inId
>       val value = inData
>       val labels = inLabels
>     }
>     class TestLink(inSrc: VertexId, inDst: VertexId, inData: String) extends Serializable  {
>       val src = inSrc
>       val dst = inDst
>       val data = inData
>     }
>     val startString = "XXXSTARTXXX"
>     val conf = new SparkConf().setAppName("pregeltest").setMaster("local[*]")
>     val sc = new SparkContext(conf)
>     val vertexes = Vector(
>       new TestVertex(0, "label0", collection.mutable.HashSet[String]()),
>       new TestVertex(1, "label1", collection.mutable.HashSet[String]())
>     )
>     val links = Vector(
>       new TestLink(0, 1, "linkData01")
>     )
>     val vertexes_packaged = vertexes.map(v => (v.id, v))
>     val links_packaged = links.map(e => Edge(e.src, e.dst, e))
>     val graph = Graph[TestVertex, TestLink](sc.parallelize(vertexes_packaged), sc.parallelize(links_packaged))
>     def vertexProgram (vertexId: VertexId, vdata: TestVertex, message: Vector[String]): TestVertex = {
>       message.foreach {
>         case `startString` =>
>           if (vdata.id == 0L)
>             vdata.labels.add(vdata.value)
>         case m =>
>           if (!vdata.labels.contains(m))
>             vdata.labels.add(m)
>       }
>       new TestVertex(vdata.id, vdata.value, vdata.labels)
>     }
>     def sendMessage (triplet: EdgeTriplet[TestVertex, TestLink]): Iterator[(VertexId, Vector[String])] = {
>       val srcLabels = triplet.srcAttr.labels
>       val dstLabels = triplet.dstAttr.labels
>       val msgsSrcDst = srcLabels.diff(dstLabels)
>         .map(label => (triplet.dstAttr.id, Vector[String](label)))
>       val msgsDstSrc = dstLabels.diff(dstLabels)
>         .map(label => (triplet.srcAttr.id, Vector[String](label)))
>       msgsSrcDst.toIterator ++ msgsDstSrc.toIterator
>     }
>     def mergeMessage (m1: Vector[String], m2: Vector[String]): Vector[String] = m1.union(m2).distinct
>     val g = graph.pregel(Vector[String](startString))(vertexProgram, sendMessage, mergeMessage)
>     println("---pregel done---")
>     println("vertex info:")
>     g.vertices.foreach(
>       v => {
>         val labels = v._2.labels
>         println(
>           "vertex " + v._1 +
>             ": name = " + v._2.id +
>             ", labels = " + labels)
>       }
>     )
>   }
> }
> {code}
> This code never terminates even though we expect it to. To fix, we simply remove the "case" designation for the TestVertex class (see FIXME comment), and then it behaves as expected.
> (Apologies if this has been fixed in later versions; we're unfortunately pegged to 2.10.5 / 1.6.0 for now.)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org