You are viewing a plain text version of this content. The canonical link for it is here.
Posted to user@spark.apache.org by Ghufran Malik <go...@gmail.com> on 2014/04/19 00:15:27 UTC
BFS implemented
Hi I have sucessfully implemented the Breadth First Search algorithm using
the Pregel operator in graphX as follows:
val graph = GraphLoader.edgeListFile(sc, "graphx/data/test_graph.txt")
val root: VertexId = 1
val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else
Double.PositiveInfinity)
val bfs = initialGraph.pregel(Double.PositiveInfinity, 20)( (id, attr, msg)
=> math.min(attr, msg), triplet => { if (triplet.srcAttr !=
Double.PositiveInfinity) { Iterator((triplet.dstId, triplet.srcAttr+1)) }
else { Iterator.empty } }, (a,b) => math.min(a,b) )
println(bfs.vertices.collect.mkString("\n"))
where the test_graph.txt is:
1 2
2 1
2 3
2 4
3 2
3 3
4 2
4 3
and the result outputted after I run my algorithm is:
(4,2.0)
(2,1.0)
(3,2.0)
(1,0.0)
which is the correct result.
I was hoping someone could improve upon my implementation by suggesting a
way in which I do not need the max iteration number (20). If I remove this
my job will continue on for sometime until eventual I receive the error:
7)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
....................carries on and on.................................
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
14/04/18 23:11:14 ERROR TaskSetManager: Task 81094.0:0 failed 1 times;
aborting job
14/04/18 23:11:14 INFO DAGScheduler: Failed to run reduce at
VertexRDD.scala:91
14/04/18 23:11:14 INFO TaskSchedulerImpl: Remove TaskSet 81094.0 from pool
org.apache.spark.SparkException: Job aborted: Task 81094.0:0 failed 1 times
(most recent failure: Exception failure: java.lang.StackOverflowError)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
Re: BFS implemented
Posted by Mayur Rustagi <ma...@gmail.com>.
would be good if you can contribute this as an example. BFS is a common
enough algo.
Mayur Rustagi
Ph: +1 (760) 203 3257
http://www.sigmoidanalytics.com
@mayur_rustagi <https://twitter.com/mayur_rustagi>
On Sat, Apr 19, 2014 at 4:16 AM, Ghufran Malik <go...@gmail.com> wrote:
> Ahh nvm I found the solution :)
>
> triplet.srcAttr != Double.PositiveInfinity && triplet.dstAttr ==
> Double.PositiveInfinity
>
> as my new if condition.
>
>
> ---------- Forwarded message ----------
> From: Ghufran Malik <go...@gmail.com>
> Date: 18 April 2014 23:15
> Subject: BFS implemented
> To: user@spark.apache.org
>
>
> Hi I have sucessfully implemented the Breadth First Search algorithm using
> the Pregel operator in graphX as follows:
>
> val graph = GraphLoader.edgeListFile(sc, "graphx/data/test_graph.txt")
>
> val root: VertexId = 1
> val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else
> Double.PositiveInfinity)
>
>
> val bfs = initialGraph.pregel(Double.PositiveInfinity, 20)( (id, attr,
> msg) => math.min(attr, msg), triplet => { if (triplet.srcAttr !=
> Double.PositiveInfinity) { Iterator((triplet.dstId, triplet.srcAttr+1)) }
> else { Iterator.empty } }, (a,b) => math.min(a,b) )
>
> println(bfs.vertices.collect.mkString("\n"))
>
> where the test_graph.txt is:
> 1 2
> 2 1
> 2 3
> 2 4
> 3 2
> 3 3
> 4 2
> 4 3
>
> and the result outputted after I run my algorithm is:
> (4,2.0)
> (2,1.0)
> (3,2.0)
> (1,0.0)
>
> which is the correct result.
>
> I was hoping someone could improve upon my implementation by suggesting a
> way in which I do not need the max iteration number (20). If I remove this
> my job will continue on for sometime until eventual I receive the error:
>
>
> 7)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
> at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
> at
> ....................carries on and on.................................
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
> at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
> at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:483)
> 14/04/18 23:11:14 ERROR TaskSetManager: Task 81094.0:0 failed 1 times;
> aborting job
> 14/04/18 23:11:14 INFO DAGScheduler: Failed to run reduce at
> VertexRDD.scala:91
> 14/04/18 23:11:14 INFO TaskSchedulerImpl: Remove TaskSet 81094.0 from pool
> org.apache.spark.SparkException: Job aborted: Task 81094.0:0 failed 1
> times (most recent failure: Exception failure: java.lang.StackOverflowError)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
> at
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
> at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
> at org.apache.spark.scheduler.DAGScheduler.org
> $apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
> at scala.Option.foreach(Option.scala:236)
> at
> org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
> at
> org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
> at akka.actor.ActorCell.invoke(ActorCell.scala:456)
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
> at akka.dispatch.Mailbox.run(Mailbox.scala:219)
> at
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
>
>
>
>
Fwd: BFS implemented
Posted by Ghufran Malik <go...@gmail.com>.
Ahh nvm I found the solution :)
triplet.srcAttr != Double.PositiveInfinity && triplet.dstAttr ==
Double.PositiveInfinity
as my new if condition.
---------- Forwarded message ----------
From: Ghufran Malik <go...@gmail.com>
Date: 18 April 2014 23:15
Subject: BFS implemented
To: user@spark.apache.org
Hi I have sucessfully implemented the Breadth First Search algorithm using
the Pregel operator in graphX as follows:
val graph = GraphLoader.edgeListFile(sc, "graphx/data/test_graph.txt")
val root: VertexId = 1
val initialGraph = graph.mapVertices((id, _) => if (id == root) 0.0 else
Double.PositiveInfinity)
val bfs = initialGraph.pregel(Double.PositiveInfinity, 20)( (id, attr, msg)
=> math.min(attr, msg), triplet => { if (triplet.srcAttr !=
Double.PositiveInfinity) { Iterator((triplet.dstId, triplet.srcAttr+1)) }
else { Iterator.empty } }, (a,b) => math.min(a,b) )
println(bfs.vertices.collect.mkString("\n"))
where the test_graph.txt is:
1 2
2 1
2 3
2 4
3 2
3 3
4 2
4 3
and the result outputted after I run my algorithm is:
(4,2.0)
(2,1.0)
(3,2.0)
(1,0.0)
which is the correct result.
I was hoping someone could improve upon my implementation by suggesting a
way in which I do not need the max iteration number (20). If I remove this
my job will continue on for sometime until eventual I receive the error:
7)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1896)
at
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
at
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1993)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1918)
at
....................carries on and on.................................
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
at scala.collection.immutable.$colon$colon.readObject(List.scala:362)
at sun.reflect.GeneratedMethodAccessor4.invoke(Unknown Source)
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:483)
14/04/18 23:11:14 ERROR TaskSetManager: Task 81094.0:0 failed 1 times;
aborting job
14/04/18 23:11:14 INFO DAGScheduler: Failed to run reduce at
VertexRDD.scala:91
14/04/18 23:11:14 INFO TaskSchedulerImpl: Remove TaskSet 81094.0 from pool
org.apache.spark.SparkException: Job aborted: Task 81094.0:0 failed 1 times
(most recent failure: Exception failure: java.lang.StackOverflowError)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1028)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$abortStage$1.apply(DAGScheduler.scala:1026)
at
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
at org.apache.spark.scheduler.DAGScheduler.org
$apache$spark$scheduler$DAGScheduler$$abortStage(DAGScheduler.scala:1026)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$processEvent$10.apply(DAGScheduler.scala:619)
at scala.Option.foreach(Option.scala:236)
at
org.apache.spark.scheduler.DAGScheduler.processEvent(DAGScheduler.scala:619)
at
org.apache.spark.scheduler.DAGScheduler$$anonfun$start$1$$anon$2$$anonfun$receive$1.applyOrElse(DAGScheduler.scala:207)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498)
at akka.actor.ActorCell.invoke(ActorCell.scala:456)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237)
at akka.dispatch.Mailbox.run(Mailbox.scala:219)
at
akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at
scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at
scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at
scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)