You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/11/27 16:02:44 UTC

[3/3] incubator-flink git commit: [scala] Fix slow building of adjacency list in Scala PageRank

[scala] Fix slow building of adjacency list in Scala PageRank


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/1a911320
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/1a911320
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/1a911320

Branch: refs/heads/master
Commit: 1a911320287b0e9c96e23f38586f421afd0c5eb2
Parents: f66892d
Author: Aljoscha Krettek <al...@gmail.com>
Authored: Thu Nov 27 15:15:10 2014 +0100
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Thu Nov 27 16:01:39 2014 +0100

----------------------------------------------------------------------
 .../examples/scala/graph/PageRankBasic.scala    | 25 ++++++++++++--------
 1 file changed, 15 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/1a911320/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
index e032bee..8eeef70 100644
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
@@ -17,12 +17,17 @@
  */
 package org.apache.flink.examples.scala.graph
 
+import java.lang.Iterable
+
+import org.apache.flink.api.common.functions.GroupReduceFunction
 import org.apache.flink.api.scala._
 import org.apache.flink.examples.java.graph.util.PageRankData
 import org.apache.flink.api.java.aggregation.Aggregations.SUM
 
 import org.apache.flink.util.Collector
 
+import scala.collection.JavaConverters._
+
 /**
  * A basic implementation of the Page Rank algorithm using a bulk iteration.
  * 
@@ -83,12 +88,13 @@ object PageRankBasic {
 
     // build adjacency list from link input
     val adjacencyLists = links
-      // initialize lists
-      .map(e => AdjacencyList(e.sourceId, Array(e.targetId)))
-      // concatenate lists
-      .groupBy("sourceId").reduce {
-      (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds)
-      }
+      .groupBy("sourceId").reduceGroup( new GroupReduceFunction[Link, AdjacencyList] {
+        override def reduce(values: Iterable[Link], out: Collector[AdjacencyList]): Unit = {
+          var outputId = -1L
+          val outputList = values.asScala map { t => outputId = t.sourceId; t.targetId }
+          out.collect(new AdjacencyList(outputId, outputList.toArray))
+        }
+      })
 
     // start iteration
     val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
@@ -97,9 +103,9 @@ object PageRankBasic {
           // distribute ranks to target pages
           .join(adjacencyLists).where("pageId").equalTo("sourceId") {
             (page, adjacent, out: Collector[Page]) =>
-            for (targetId <- adjacent.targetIds) {
-              out.collect(Page(targetId, page.rank / adjacent.targetIds.length))
-            }
+              val targets = adjacent.targetIds
+              val len = targets.length
+              adjacent.targetIds foreach { t => out.collect(Page(t, page.rank /len )) }
           }
           // collect ranks and sum them up
           .groupBy("pageId").aggregate(SUM, "rank")
@@ -114,7 +120,6 @@ object PageRankBasic {
             // check for significant update
             if (math.abs(current.rank - next.rank) > EPSILON) out.collect(1)
         }
-
         (newRanks, termination)
     }