You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@spark.apache.org by "Aram Mkrtchyan (JIRA)" <ji...@apache.org> on 2015/08/19 17:21:46 UTC

[jira] [Comment Edited] (SPARK-5480) GraphX pageRank: java.lang.ArrayIndexOutOfBoundsException:

    [ https://issues.apache.org/jira/browse/SPARK-5480?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14703164#comment-14703164 ] 

Aram Mkrtchyan edited comment on SPARK-5480 at 8/19/15 3:21 PM:
----------------------------------------------------------------

According to our investigation Spark stores mappings for every partition based on RDD data, it assumes that underlying data isn't being changed during computation. When you have an RDD which is returns different results every time you compute it (RDD source may be mutable), it may not match with it's mapping for previous state, this may happen even if you persist it (for example it may not fit in your cluster memory and executor may lose cached partition and require re-computation). For example EdgePartition's data remains the same, but RDD data can mutate (have new items).

The solution is not clear, maybe creating updatable EdgePartition or storing current snapshot somewhere (to HDFS for example) and read from there to create graph.

This code pretty much explains what I mean:
val edges = EdgeRDD.fromEdges[Int, Int](sparkContext.makeRDD(getEdges, 5).map { t =>
         val r = Random.nextInt()
         Edge(r * t.srcId, r * t.dstId, t.attr)
      })
      val graph = Graph.fromEdges(edges, -1, StorageLevel.NONE, StorageLevel.NONE)
      edges.persist()
      edges.count()
      edges.unpersist()
      graph.subgraph(epred = t => t.srcAttr != -1).triplets.collect()


was (Author: aram.mkrtchyan):
According to our investigation Spark stores mappings for every partition based on RDD data, it assumes that underlying data isn't being changed during computation. When you have an RDD which is returns different results every time you compute it (RDD source may be mutable), it may not match with it's mapping for previous state, this may happen even if you persist it (for example it may not fit in your cluster memory and executor may lose cached partition and require re-computation). For example EdgePartition's data remains the same, but RDD data can mutate (have new items).

The solution is not clear, maybe creating updatable EdgePartition or storing current snapshot somewhere (to HDFS for example) and read from there to create graph.

> GraphX pageRank: java.lang.ArrayIndexOutOfBoundsException: 
> -----------------------------------------------------------
>
>                 Key: SPARK-5480
>                 URL: https://issues.apache.org/jira/browse/SPARK-5480
>             Project: Spark
>          Issue Type: Bug
>          Components: GraphX
>    Affects Versions: 1.2.0, 1.3.1
>         Environment: Yarn client
>            Reporter: Stephane Maarek
>
> Running the following code:
>     val subgraph = graph.subgraph (
>       vpred = (id,article) => //working predicate)
>     ).cache()
>     println( s"Subgraph contains ${subgraph.vertices.count} nodes and ${subgraph.edges.count} edges")
>     val prGraph = subgraph.staticPageRank(5).cache
>     val titleAndPrGraph = subgraph.outerJoinVertices(prGraph.vertices) {
>       (v, title, rank) => (rank.getOrElse(0.0), title)
>     }
>     titleAndPrGraph.vertices.top(13) {
>       Ordering.by((entry: (VertexId, (Double, _))) => entry._2._1)
>     }.foreach(t => println(t._2._2._1 + ": " + t._2._1 + ", id:" + t._1))
> Returns a graph with 5000 nodes and 4000 edges.
> Then it crashes during the PageRank with the following:
> 15/01/29 05:51:07 INFO scheduler.TaskSetManager: Starting task 125.0 in stage 39.0 (TID 1808, *HIDDEN, PROCESS_LOCAL, 2059 bytes)
> 15/01/29 05:51:07 WARN scheduler.TaskSetManager: Lost task 107.0 in stage 39.0 (TID 1794, *HIDDEN): java.lang.ArrayIndexOutOfBoundsException: -1
>         at org.apache.spark.graphx.util.collection.GraphXPrimitiveKeyOpenHashMap$mcJI$sp.apply$mcJI$sp(GraphXPrimitiveKeyOpenHashMap.scala:64)
>         at org.apache.spark.graphx.impl.EdgePartition.updateVertices(EdgePartition.scala:91)
>         at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:75)
>         at org.apache.spark.graphx.impl.ReplicatedVertexView$$anonfun$2$$anonfun$apply$1.apply(ReplicatedVertexView.scala:73)
>         at scala.collection.Iterator$$anon$11.next(Iterator.scala:328)
>         at org.apache.spark.graphx.impl.EdgeRDDImpl$$anonfun$mapEdgePartitions$1.apply(EdgeRDDImpl.scala:110)
>         at org.apache.spark.graphx.impl.EdgeRDDImpl$$anonfun$mapEdgePartitions$1.apply(EdgeRDDImpl.scala:108)
>         at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
>         at org.apache.spark.rdd.RDD$$anonfun$13.apply(RDD.scala:601)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>         at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>         at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>         at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>         at org.apache.spark.rdd.ZippedPartitionsRDD2.compute(ZippedPartitionsRDD.scala:88)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>         at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:61)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:228)
>         at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:35)
>         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:263)
>         at org.apache.spark.rdd.RDD.iterator(RDD.scala:230)
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:68)
>         at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:41)
>         at org.apache.spark.scheduler.Task.run(Task.scala:56)
>         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:196)
>         at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>         at java.lang.Thread.run(Thread.java:744)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@spark.apache.org
For additional commands, e-mail: issues-help@spark.apache.org