You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/11/27 16:02:44 UTC
[3/3] incubator-flink git commit: [scala] Fix slow building of
adjacency list in Scala PageRank
[scala] Fix slow building of adjacency list in Scala PageRank
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1a911320
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1a911320
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1a911320
Branch: refs/heads/master
Commit: 1a911320287b0e9c96e23f38586f421afd0c5eb2
Parents: f66892d
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Nov 27 15:15:10 2014 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 27 16:01:39 2014 +0100
----------------------------------------------------------------------
.../examples/scala/graph/PageRankBasic.scala | 25 ++++++++++++--------
1 file changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a911320/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
index e032bee..8eeef70 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
@@ -17,12 +17,17 @@
*/
package org.apache.flink.examples.scala.graph
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.GroupReduceFunction
import org.apache.flink.api.scala._
import org.apache.flink.examples.java.graph.util.PageRankData
import org.apache.flink.api.java.aggregation.Aggregations.SUM
import org.apache.flink.util.Collector
+import scala.collection.JavaConverters._
+
/**
* A basic implementation of the Page Rank algorithm using a bulk iteration.
*
@@ -83,12 +88,13 @@ object PageRankBasic {
// build adjacency list from link input
val adjacencyLists = links
- // initialize lists
- .map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
- // concatenate lists
- .groupBy("sourceId").reduce {
- (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
- }
+ .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] {
+ override def reduce(values: Iterable[Link], out: Collector[AdjacencyList]): Unit = {
+ var outputId = -1L
+ val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId }
+ out.collect(new AdjacencyList(outputId, outputList.toArray))
+ }
+ })
// start iteration
val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
@@ -97,9 +103,9 @@ object PageRankBasic {
// distribute ranks to target pages
.join(adjacencyLists).where("pageId").equalTo("sourceId") {
(page, adjacent, out: Collector[Page]) =>
- for (targetId <- adjacent.targetIds) {
- out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
- }
+ val targets = adjacent.targetIds
+ val len = targets.length
+ adjacent.targetIds foreach { t => out.collect(Page(t, page.rank /len )) }
}
// collect ranks and sum them up
.groupBy("pageId").aggregate(SUM, "rank")
@@ -114,7 +120,6 @@ object PageRankBasic {
// check for significant update
if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
}
-
(newRanks, termination)
}