You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ghufran Malik <go...@gmail.com> on 2014/04/19 00:15:27 UTC

BFS implemented

Hi I have sucessfully implemented the Breadth First Search algorithm using
the Pregel operator in graphX as follows:

val graph = GraphLoader.edgeListFile(sc, "graphx/data/test_graph.txt")

val root: VertexId = 1
val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else
Double.PositiveInfinity)


val bfs = initialGraph.pregel(Double.PositiveInfinity, 20)( (id, attr, msg)
=> math.min(attr, msg), triplet => { if (triplet.srcAttr !=
Double.PositiveInfinity) { Iterator((triplet.dstId, triplet.srcAttr+1)) }
else { Iterator.empty } }, (a,b) => math.min(a,b) )

println(bfs.vertices.collect.mkString("\n"))

where the test_graph.txt is:
1 2
2 1
2 3
2 4
3 2
3 3
4 2
4 3

and the result outputted after I run my algorithm is:
(4,2.0)
(2,1.0)
(3,2.0)
(1,0.0)

which is the correct result.

I was hoping someone could improve upon my implementation by suggesting a
way in which I do not need the max iteration number (20). If I remove this
my job will continue on for sometime until eventual I receive the error:


7)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
    at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at
....................carries on and on.................................
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
14/04/18 23:11:14 ERROR TaskSetManager: Task 81094.0:0 failed 1 times;
aborting job
14/04/18 23:11:14 INFO DAGScheduler: Failed to run reduce at
VertexRDD.scala:91
14/04/18 23:11:14 INFO TaskSchedulerImpl: Remove TaskSet 81094.0 from pool
org.apache.spark.SparkException: Job aborted: Task 81094.0:0 failed 1 times
(most recent failure: Exception failure: java.lang.StackOverflowError)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
    at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
    at scala.Option.foreach(Option.scala:236)
    at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

Re: BFS implemented

Posted by Mayur Rustagi <ma...@gmail.com>.
would be good if you can contribute this as an example. BFS is a common
enough algo.

Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>



On Sat, Apr 19, 2014 at 4:16 AM, Ghufran Malik <go...@gmail.com> wrote:

> Ahh nvm I found the solution :)
>
> triplet.srcAttr != Double.PositiveInfinity && triplet.dstAttr ==
> Double.PositiveInfinity
>
> as my new if condition.
>
>
> ---------- Forwarded message ----------
> From: Ghufran Malik <go...@gmail.com>
> Date: 18 April 2014 23:15
> Subject: BFS implemented
> To: user@spark.apache.org
>
>
> Hi I have sucessfully implemented the Breadth First Search algorithm using
> the Pregel operator in graphX as follows:
>
> val graph = GraphLoader.edgeListFile(sc, "graphx/data/test_graph.txt")
>
> val root: VertexId = 1
> val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else
> Double.PositiveInfinity)
>
>
> val bfs = initialGraph.pregel(Double.PositiveInfinity, 20)( (id, attr,
> msg) => math.min(attr, msg), triplet => { if (triplet.srcAttr !=
> Double.PositiveInfinity) { Iterator((triplet.dstId, triplet.srcAttr+1)) }
> else { Iterator.empty } }, (a,b) => math.min(a,b) )
>
> println(bfs.vertices.collect.mkString("\n"))
>
> where the test_graph.txt is:
> 1 2
> 2 1
> 2 3
> 2 4
> 3 2
> 3 3
> 4 2
> 4 3
>
> and the result outputted after I run my algorithm is:
> (4,2.0)
> (2,1.0)
> (3,2.0)
> (1,0.0)
>
> which is the correct result.
>
> I was hoping someone could improve upon my implementation by suggesting a
> way in which I do not need the max iteration number (20). If I remove this
> my job will continue on for sometime until eventual I receive the error:
>
>
> 7)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
>     at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
>     at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
>     at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
>     at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
>     at
> ....................carries on and on.................................
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>     at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
>     at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
>     at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
>     at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>     at java.lang.reflect.Method.invoke(Method.java:483)
> 14/04/18 23:11:14 ERROR TaskSetManager: Task 81094.0:0 failed 1 times;
> aborting job
> 14/04/18 23:11:14 INFO DAGScheduler: Failed to run reduce at
> VertexRDD.scala:91
> 14/04/18 23:11:14 INFO TaskSchedulerImpl: Remove TaskSet 81094.0 from pool
> org.apache.spark.SparkException: Job aborted: Task 81094.0:0 failed 1
> times (most recent failure: Exception failure: java.lang.StackOverflowError)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
>     at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>     at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
>     at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
>     at scala.Option.foreach(Option.scala:236)
>     at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
>     at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
>     at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
>     at akka.actor.ActorCell.invoke(ActorCell.scala:456)
>     at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
>     at akka.dispatch.Mailbox.run(Mailbox.scala:219)
>     at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
>     at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>     at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>     at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>     at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
>

Fwd: BFS implemented

Posted by Ghufran Malik <go...@gmail.com>.
Ahh nvm I found the solution :)

triplet.srcAttr != Double.PositiveInfinity && triplet.dstAttr ==
Double.PositiveInfinity

as my new if condition.

---------- Forwarded message ----------
From: Ghufran Malik <go...@gmail.com>
Date: 18 April 2014 23:15
Subject: BFS implemented
To: user@spark.apache.org


Hi I have sucessfully implemented the Breadth First Search algorithm using
the Pregel operator in graphX as follows:

val graph = GraphLoader.edgeListFile(sc, "graphx/data/test_graph.txt")

val root: VertexId = 1
val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else
Double.PositiveInfinity)


val bfs = initialGraph.pregel(Double.PositiveInfinity, 20)( (id, attr, msg)
=> math.min(attr, msg), triplet => { if (triplet.srcAttr !=
Double.PositiveInfinity) { Iterator((triplet.dstId, triplet.srcAttr+1)) }
else { Iterator.empty } }, (a,b) => math.min(a,b) )

println(bfs.vertices.collect.mkString("\n"))

where the test_graph.txt is:
1 2
2 1
2 3
2 4
3 2
3 3
4 2
4 3

and the result outputted after I run my algorithm is:
(4,2.0)
(2,1.0)
(3,2.0)
(1,0.0)

which is the correct result.

I was hoping someone could improve upon my implementation by suggesting a
way in which I do not need the max iteration number (20). If I remove this
my job will continue on for sometime until eventual I receive the error:


7)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
    at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
    at
....................carries on and on.................................
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
    at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
    at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:483)
14/04/18 23:11:14 ERROR TaskSetManager: Task 81094.0:0 failed 1 times;
aborting job
14/04/18 23:11:14 INFO DAGScheduler: Failed to run reduce at
VertexRDD.scala:91
14/04/18 23:11:14 INFO TaskSchedulerImpl: Remove TaskSet 81094.0 from pool
org.apache.spark.SparkException: Job aborted: Task 81094.0:0 failed 1 times
(most recent failure: Exception failure: java.lang.StackOverflowError)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
    at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
    at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
    at scala.Option.foreach(Option.scala:236)
    at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
    at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
    at akka.actor.ActorCell.invoke(ActorCell.scala:456)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
    at akka.dispatch.Mailbox.run(Mailbox.scala:219)
    at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)