You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:29:16 UTC
[34/60] git commit: Adds PageRankBasic Scala example. Removes old
Scala examples
Adds PageRankBasic Scala example. Removes old Scala examples
Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/0dc76147
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/0dc76147
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/0dc76147
Branch: refs/heads/master
Commit: 0dc761473a4bfad5f22152cec443dde7cfaee7eb
Parents: 81e81b9
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Sep 11 15:42:29 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 22 09:59:58 2014 +0200
----------------------------------------------------------------------
.../flink/example/java/graph/PageRankBasic.java | 23 ++-
.../example/java/graph/util/PageRankData.java | 83 ++++-----
.../scala/graph/ComputeEdgeDegrees.scala | 123 ------------
.../flink/examples/scala/graph/PageRank.scala | 108 -----------
.../examples/scala/graph/PageRankBasic.scala | 185 +++++++++++++++++++
.../scala/graph/PageRankWithWeight.scala | 108 -----------
.../iterations/PageRankCompilerTest.java | 4 +-
7 files changed, 240 insertions(+), 394 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0dc76147/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
index 656c424..8a57007 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
@@ -91,7 +91,7 @@ public class PageRankBasic {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
- DataSet<Tuple1<Long>> pagesInput = getPagesDataSet(env);
+ DataSet<Long> pagesInput = getPagesDataSet(env);
DataSet<Tuple2<Long, Long>> linksInput = getLinksDataSet(env);
// assign initial rank to pages
@@ -138,7 +138,7 @@ public class PageRankBasic {
/**
* A map function that assigns an initial rank to all pages.
*/
- public static final class RankAssigner implements MapFunction<Tuple1<Long>, Tuple2<Long, Double>> {
+ public static final class RankAssigner implements MapFunction<Long, Tuple2<Long, Double>> {
Tuple2<Long, Double> outPageWithRank;
public RankAssigner(double rank) {
@@ -146,8 +146,8 @@ public class PageRankBasic {
}
@Override
- public Tuple2<Long, Double> map(Tuple1<Long> page) {
- outPageWithRank.f0 = page.f0;
+ public Tuple2<Long, Double> map(Long page) {
+ outPageWithRank.f0 = page;
return outPageWithRank;
}
}
@@ -259,12 +259,17 @@ public class PageRankBasic {
return true;
}
- private static DataSet<Tuple1<Long>> getPagesDataSet(ExecutionEnvironment env) {
+ private static DataSet<Long> getPagesDataSet(ExecutionEnvironment env) {
if(fileOutput) {
- return env.readCsvFile(pagesInputPath)
- .fieldDelimiter(' ')
- .lineDelimiter("\n")
- .types(Long.class);
+ return env
+ .readCsvFile(pagesInputPath)
+ .fieldDelimiter(' ')
+ .lineDelimiter("\n")
+ .types(Long.class)
+ .map(new MapFunction<Tuple1<Long>, Long>() {
+ @Override
+ public Long map(Tuple1<Long> v) { return v.f0; }
+ });
} else {
return PageRankData.getDefaultPagesDataSet(env);
}
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0dc76147/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/PageRankData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/PageRankData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/PageRankData.java
index 6e494a4..10cf748 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/PageRankData.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/PageRankData.java
@@ -22,11 +22,9 @@ package org.apache.flink.example.java.graph.util;
import java.util.ArrayList;
import java.util.List;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
/**
* Provides the default data sets used for the PageRank example program.
@@ -35,52 +33,51 @@ import org.apache.flink.api.java.ExecutionEnvironment;
*/
public class PageRankData {
+ public static final Object[][] EDGES = {
+ {1L, 2L},
+ {1L, 15L},
+ {2L, 3L},
+ {2L, 4L},
+ {2L, 5L},
+ {2L, 6L},
+ {2L, 7L},
+ {3L, 13L},
+ {4L, 2L},
+ {5L, 11L},
+ {5L, 12L},
+ {6L, 1L},
+ {6L, 7L},
+ {6L, 8L},
+ {7L, 1L},
+ {7L, 8L},
+ {8L, 1L},
+ {8L, 9L},
+ {8L, 10L},
+ {9L, 14L},
+ {9L, 1L},
+ {10L, 1L},
+ {10L, 13L},
+ {11L, 12L},
+ {11L, 1L},
+ {12L, 1L},
+ {13L, 14L},
+ {14L, 12L},
+ {15L, 1L},
+ };
+
private static long numPages = 15;
- public static DataSet<Tuple1<Long>> getDefaultPagesDataSet(ExecutionEnvironment env) {
-
- List<Tuple1<Long>> data = new ArrayList<Tuple1<Long>>();
+ public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
- for(long i=0; i<numPages; i++) {
- data.add(new Tuple1<Long>(i));
+ List<Tuple2<Long, Long>> edges = new ArrayList<Tuple2<Long, Long>>();
+ for(Object[] e : EDGES) {
+ edges.add(new Tuple2<Long, Long>((Long)e[0], (Long)e[1]));
}
- return env.fromCollection(data);
+ return env.fromCollection(edges);
}
- public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
-
- List<Tuple2<Long, Long>> data = new ArrayList<Tuple2<Long, Long>>();
- data.add(new Tuple2<Long, Long>(1L, 2L));
- data.add(new Tuple2<Long, Long>(1L, 15L));
- data.add(new Tuple2<Long, Long>(2L, 3L));
- data.add(new Tuple2<Long, Long>(2L, 4L));
- data.add(new Tuple2<Long, Long>(2L, 5L));
- data.add(new Tuple2<Long, Long>(2L, 6L));
- data.add(new Tuple2<Long, Long>(2L, 7L));
- data.add(new Tuple2<Long, Long>(3L, 13L));
- data.add(new Tuple2<Long, Long>(4L, 2L));
- data.add(new Tuple2<Long, Long>(5L, 11L));
- data.add(new Tuple2<Long, Long>(5L, 12L));
- data.add(new Tuple2<Long, Long>(6L, 1L));
- data.add(new Tuple2<Long, Long>(6L, 7L));
- data.add(new Tuple2<Long, Long>(6L, 8L));
- data.add(new Tuple2<Long, Long>(7L, 1L));
- data.add(new Tuple2<Long, Long>(7L, 8L));
- data.add(new Tuple2<Long, Long>(8L, 1L));
- data.add(new Tuple2<Long, Long>(8L, 9L));
- data.add(new Tuple2<Long, Long>(8L, 10L));
- data.add(new Tuple2<Long, Long>(9L, 14L));
- data.add(new Tuple2<Long, Long>(9L, 1L));
- data.add(new Tuple2<Long, Long>(10L, 1L));
- data.add(new Tuple2<Long, Long>(10L, 13L));
- data.add(new Tuple2<Long, Long>(11L, 12L));
- data.add(new Tuple2<Long, Long>(11L, 1L));
- data.add(new Tuple2<Long, Long>(12L, 1L));
- data.add(new Tuple2<Long, Long>(13L, 14L));
- data.add(new Tuple2<Long, Long>(14L, 12L));
- data.add(new Tuple2<Long, Long>(15L, 1L));
-
- return env.fromCollection(data);
+ public static DataSet<Long> getDefaultPagesDataSet(ExecutionEnvironment env) {
+ return env.generateSequence(1, 15);
}
public static long getNumberOfPages() {
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0dc76147/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ComputeEdgeDegrees.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ComputeEdgeDegrees.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ComputeEdgeDegrees.scala
deleted file mode 100644
index 5200074..0000000
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ComputeEdgeDegrees.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.examples.scala.graph
-//
-//import org.apache.flink.client.LocalExecutor
-//import org.apache.flink.api.common.Program
-//import org.apache.flink.api.common.ProgramDescription
-//
-//import org.apache.flink.api.scala._
-//import org.apache.flink.api.scala.operators._
-//
-///**
-// * Annotates edges with associated vertex degrees.
-// */
-//class ComputeEdgeDegrees extends Program with ProgramDescription with Serializable {
-// override def getDescription() = {
-// "Parameters: [numSubStasks] [input file] [output file]"
-// }
-// override def getPlan(args: String*) = {
-// getScalaPlan(args(0).toInt, args(1), args(2))
-// }
-//
-// /*
-// * Output formatting function for edges with annotated degrees
-// */
-// def formatEdgeWithDegrees = (v1: Int, v2: Int, c1: Int, c2: Int) => "%d,%d|%d,%d".format(v1, v2, c1, c2)
-//
-// /*
-// * Emits one edge for each unique input edge with the vertex degree of the first(and grouping key) vertex.
-// * The degree of the second (non-grouping key) vertexes are set to zero.
-// * Edges are projected such that smaller vertex is the first vertex.
-// */
-// def annotateFirstVertexDegree(eI: Iterator[(Int, Int)]): List[(Int, Int, Int, Int)] = {
-// val eL = eI.toList
-// val eLUniq = eL.distinct
-// val cnt = eLUniq.size
-// for (e <- eLUniq)
-// yield if (e._1 < e._2)
-// (e._1, e._2, cnt, 0)
-// else
-// (e._2, e._1, 0, cnt)
-// }
-//
-// /*
-// * Combines the degrees of both vertexes of an edge.
-// */
-// def combineVertexDegrees(eI: Iterator[(Int, Int, Int, Int)]) : (Int, Int, Int, Int) = {
-//
-// val eL = eI.toList
-// if (eL.size != 2)
-// throw new RuntimeException("Problem when combinig vertex counts");
-//
-// if (eL(0)._3 == 0 && eL(1)._4 == 0)
-// (eL(0)._1, eL(1)._3, eL(0)._2, eL(0)._4)
-// else
-// (eL(0)._1, eL(0)._3, eL(0)._2, eL(1)._4)
-//
-// }
-//
-// def getScalaPlan(numSubTasks: Int, edgeInput: String, annotatedEdgeOutput: String) = {
-//
-// /*
-// * Input format for edges.
-// * Edges are separated by new line '\n'.
-// * An edge is represented as two Integer vertex IDs which are separated by a blank ','.
-// */
-// val edges = DataSource(edgeInput, CsvInputFormat[(Int, Int)]("\n", ','))
-//
-// /*
-// * Emit each edge twice with both vertex orders.
-// */
-// val projEdges = edges flatMap { (e) => Iterator((e._1, e._2) , (e._2, e._1)) }
-//
-// /*
-// * Annotates each edges with degree for the first vertex.
-// */
-// val vertexCnts = projEdges groupBy { _._1 } reduceGroup { annotateFirstVertexDegree } flatMap {x => x.iterator }
-//
-// /*
-// * Combines the degrees of both vertexes of an edge.
-// */
-// val combinedVertexCnts = vertexCnts groupBy { (x) => (x._1, x._2) } reduceGroup { combineVertexDegrees }
-//
-// /*
-// * Emit annotated edges.
-// */
-// val output = combinedVertexCnts.write(annotatedEdgeOutput, DelimitedOutputFormat(formatEdgeWithDegrees.tupled))
-//
-// val plan = new ScalaPlan(Seq(output), "Compute Edge Degrees")
-// plan.setDefaultParallelism(numSubTasks)
-// plan
-// }
-//
-// object RunComputeEdgeDegrees {
-// def main(args: Array[String]) {
-// val ced = new ComputeEdgeDegrees
-// if (args.size < 3) {
-// println(ced.getDescription)
-// return
-// }
-// val plan = ced.getScalaPlan(args(0).toInt, args(1), args(2))
-// LocalExecutor.execute(plan)
-// System.exit(0)
-// }
-//}
-//}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0dc76147/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRank.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRank.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRank.scala
deleted file mode 100644
index 2ef029f..0000000
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRank.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.examples.scala.graph;
-//
-//import org.apache.flink.client.LocalExecutor
-//import org.apache.flink.api.common.Program
-//
-//import org.apache.flink.api.scala._
-//import org.apache.flink.api.scala.operators._
-//
-///**
-// * An example program computing the page rank for each vertex in a graph.
-// * The graph is initially represented by vertices and edges. Vertices are numeric identifiers, while
-// * edges are pairs of identifiers that represent the source and target vertex.
-// *
-// * This variant of page rank assumes that all edges that originate at one vertex have an equal
-// * probability of being chosen.
-// */
-//class PageRank extends Program with Serializable {
-//
-// def getScalaPlan(verticesPath: String, edgesPath: String, outputPath: String, numVertices: Long, maxIterations: Int) = {
-//
-// case class PageWithRank(pageId: Long, rank: Double)
-// case class Edge(from: Long, to: Long)
-// case class Adjacency(vertex: Long, neighbors: List[Long])
-//
-// // read the pages and edges. the pages are only single decimal identifiers, the edges pairs of identifiers
-// val pages = DataSource(verticesPath, CsvInputFormat[Long]())
-// val edges = DataSource(edgesPath, CsvInputFormat[Edge]("\n", ' '))
-//
-// // some constants used in the specific rank computation
-// val dampening = 0.85
-// val randomJump = (1.0 - dampening) / numVertices
-// val initialRank = 1.0 / numVertices
-//
-// // assign the initial uniform rank to all pages
-// val pagesWithRank = pages map { p => PageWithRank(p, initialRank) }
-//
-// // transform the edges from a list of (from -> target) pairs to an adjacency list (from -> [all-targets])
-// val adjacencies = edges.groupBy(_.from).reduceGroup(x => x.foldLeft(Adjacency(0, List[Long]()))((a, e) => Adjacency(e.from, e.to :: a.neighbors)));
-//
-// def computeRank(ranks: DataSetOLD[PageWithRank]) = {
-//
-// val ranksForNeighbors = ranks join adjacencies where { _.pageId } isEqualTo { _.vertex } flatMap ( (p, e) => {
-// val numNeighbors = e.neighbors.length
-//
-// for (target <- e.neighbors)
-// yield (target, p.rank / numNeighbors)
-//
-// });
-//
-// ranksForNeighbors.groupBy { case (node, rank) => node }
-// .reduce { (a, b) => (a._1, a._2 + b._2) }
-// .map { case (node, rank) => PageWithRank(node, rank * dampening + randomJump) }
-// }
-//
-// val finalRanks = pagesWithRank.iterate(maxIterations, computeRank)
-//
-// val output = finalRanks.write(outputPath, CsvOutputFormat())
-//
-// new ScalaPlan(Seq(output), "Page Rank")
-// }
-//
-// override def getPlan(args: String*) = {
-// val planArgs: Array[String] = if (args.length < 5) Array[String]("", "", "", "", "") else args.toArray
-// val dop = if (args.size > 5) args(5).toInt else 1
-//
-// val plan = getScalaPlan(planArgs(0), planArgs(1), planArgs(2), planArgs(3).toLong, planArgs(4).toInt)
-// plan.setDefaultParallelism(dop)
-// plan
-// }
-//}
-//
-///**
-// * Executable entry point to run the program locally.
-// */
-//object RunPageRank {
-//
-// def main(args: Array[String]) {
-// if (args.size < 5) {
-// println("PageRank <pages input path> <links input path> <result path> <num pages> <num iterations> [<parallelism=1>]")
-// return
-// }
-//
-// val dop = if (args.length > 5) args(5).toInt else 1
-// val plan = new PageRank().getScalaPlan(args(0), args(1), args(2), args(3).toLong, args(4).toInt);
-//
-// plan.setDefaultParallelism(dop)
-// LocalExecutor.execute(plan)
-// }
-//}
-//
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0dc76147/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
new file mode 100644
index 0000000..e24727c
--- /dev/null
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala.graph
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+import org.apache.flink.example.java.graph.util.PageRankData
+import org.apache.flink.util.Collector
+import org.apache.flink.api.java.aggregation.Aggregations.SUM
+
+
+/**
+ * A basic implementation of the Page Rank algorithm using a bulk iteration.
+ *
+ * <p>
+ * This implementation requires a set of pages and a set of directed links as input and works as follows. <br>
+ * In each iteration, the rank of every page is evenly distributed to all pages it points to.
+ * Each page collects the partial ranks of all pages that point to it, sums them up, and applies a dampening factor to the sum.
+ * The result is the new rank of the page. A new iteration is started with the new ranks of all pages.
+ * This implementation terminates after a fixed number of iterations.<br>
+ * This is the Wikipedia entry for the <a href="http://en.wikipedia.org/wiki/Page_rank">Page Rank algorithm</a>.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Pages represented as an (long) ID separated by new-line characters.<br>
+ * For example <code>"1\n2\n12\n42\n63\n"</code> gives five pages with IDs 1, 2, 12, 42, and 63.
+ * <li>Links are represented as pairs of page IDs which are separated by space
+ * characters. Links are separated by new-line characters.<br>
+ * For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (directed) links (1)->(2), (2)->(12), (1)->(12), and (42)->(63).<br>
+ * For this simple implementation it is required that each page has at least one incoming and one outgoing link (a page can point to itself).
+ * </ul>
+ *
+ * <p>
+ * Usage: <code>PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations></code><br>
+ * If no parameters are provided, the program is run with default data from {@link PageRankData} and 10 iterations.
+ *
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Bulk Iterations
+ * <li>Default Join
+ * <li>Configure user-defined functions using constructor parameters.
+ * </ul>
+ *
+ */
+object PageRankBasic {
+
+ private final val DAMPENING_FACTOR: Double = 0.85;
+ private final val EPSILON: Double = 0.0001;
+
+ def main(args: Array[String]) {
+ if (!parseParameters(args)) {
+ return
+ }
+
+ // set up execution environment
+ val env = ExecutionEnvironment.getExecutionEnvironment
+
+ // read input data
+ val pages = getPagesDataSet(env)
+ val links = getLinksDataSet(env)
+
+ // assign initial ranks to pages
+ val pagesWithRanks = pages.map(p => Page(p, (1.0/numPages)))
+
+ // build adjacency list from link input
+ val adjacencyLists = links
+ // initialize lists
+ .map( e => AdjacencyList(e.sourceId, Array[java.lang.Long](e.targetId) ))
+ // concatenate lists
+ .groupBy(0).reduce( (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds))
+
+ // start iteration
+ val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
+ currentRanks =>
+ val newRanks = currentRanks
+ // distribute ranks to target pages
+ .join(adjacencyLists).where(0).equalTo(0)
+ .flatMap { x => for(targetId <- x._2.targetIds) yield Page(targetId, (x._1.rank / x._2.targetIds.length))}
+ // collect ranks and sum them up
+ .groupBy(0).aggregate(SUM, 1)
+ // apply dampening factor
+ .map { p => Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages) ) }
+
+ // terminate if no rank update was significant
+ val termination = currentRanks
+ .join(newRanks).where(0).equalTo(0)
+ // check for significant update
+ .filter( x => math.abs(x._1.rank - x._2.rank) > EPSILON )
+
+ (newRanks, termination)
+ }
+
+ val result = finalRanks;
+
+ // emit result
+ if (fileOutput) {
+ result.writeAsCsv(outputPath, "\n", " ")
+ } else {
+ result.print()
+ }
+
+ // execute program
+ env.execute("Basic PageRank Example")
+ }
+
+ // *************************************************************************
+ // USER TYPES
+ // *************************************************************************
+
+ case class Link(sourceId: Long, targetId: Long)
+ case class Page(pageId: java.lang.Long, rank: Double)
+ case class AdjacencyList(sourceId: java.lang.Long, targetIds: Array[java.lang.Long])
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private def parseParameters(args: Array[String]): Boolean = {
+ if (args.length > 0) {
+ fileOutput = true
+ if (args.length == 5) {
+ pagesInputPath = args(0)
+ linksInputPath = args(1)
+ outputPath = args(2)
+ numPages = args(3).toLong
+ maxIterations = args(4).toInt
+ } else {
+ System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>");
+ false
+ }
+ } else {
+ System.out.println("Executing PageRank Basic example with default parameters and built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>");
+
+ numPages = PageRankData.getNumberOfPages();
+ }
+ true
+ }
+
+ private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
+ if(fileOutput) {
+ env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = ' ', lineDelimiter = "\n")
+ .map(x => x._1)
+ } else {
+ env.fromCollection(Seq.range(1, PageRankData.getNumberOfPages()+1))
+ }
+ }
+
+ private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = {
+ if (fileOutput) {
+ env.readCsvFile[(Long, Long)](linksInputPath, fieldDelimiter = ' ', includedFields = Array(0, 1))
+ .map { x => Link(x._1, x._2) }
+ } else {
+ val edges = PageRankData.EDGES.map{ case Array(v1, v2) => Link(v1.asInstanceOf[Long], v2.asInstanceOf[Long]) }
+ env.fromCollection(edges)
+ }
+ }
+
+ private var fileOutput: Boolean = false
+ private var pagesInputPath: String = null
+ private var linksInputPath: String = null
+ private var outputPath: String = null
+ private var numPages: Long = 0;
+ private var maxIterations: Int = 10;
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0dc76147/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankWithWeight.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankWithWeight.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankWithWeight.scala
deleted file mode 100644
index 97ee62e..0000000
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankWithWeight.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements. See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership. The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License. You may obtain a copy of the License at
-// *
-// * http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.examples.scala.graph;
-//
-//import scala.math._
-//
-//import org.apache.flink.api.scala._
-//import org.apache.flink.api.scala.operators._
-//import org.apache.flink.api.scala.analysis.GlobalSchemaPrinter
-//
-//import org.apache.flink.client.LocalExecutor
-//import org.apache.flink.api.common.Program
-//import org.apache.flink.api.common.ProgramDescription
-//import org.apache.flink.api.common.Plan
-//import org.apache.flink.api.java.record.operators.DeltaIteration
-//
-///**
-// * An implementation of the PageRank algorithm for graph vertex ranking. Runs a specified fix number
-// * of iterations. This version of page rank expects the edges to define a transition
-// * probability and hence allows to model situations where not all outgoing links are equally probable.
-// *
-// * <p>
-// *
-// * Expects inputs are:
-// * 1. Path to a file of node ids, as a sequence of Longs, line delimited.
-// * 2. Path to a csv file of edges in the format <tt>sourceId targetId transitionProbability</tt> (fields separated by spaces).
-// * The ids are expected to be Longs, the transition probability a float or double.
-// * 3. Path to where the output should be written
-// * 4. The number of vertices
-// * 5. The number of iterations
-// */
-//class PageRankWithWeight extends Program with Serializable {
-//
-// def getScalaPlan(verticesPath: String, edgesPath: String, outputPath: String, numVertices: Long, maxIterations: Int) = {
-//
-// case class PageWithRank(pageId: Long, rank: Double)
-// case class Edge(from: Long, to: Long, transitionProb: Double)
-//
-// val pages = DataSource(verticesPath, CsvInputFormat[Long]())
-// val edges = DataSource(edgesPath, CsvInputFormat[Edge]("\n", ' ')) // line delimiter (\n), field delimiter (' ')
-//
-// val dampening = 0.85
-// val randomJump = (1.0 - dampening) / numVertices
-// val initialRank = 1.0 / numVertices
-//
-// val pagesWithRank = pages map { p => PageWithRank(p, initialRank) }
-//
-// def computeRank(ranks: DataSetOLD[PageWithRank]) = {
-//
-// val ranksForNeighbors = ranks join edges where { _.pageId } isEqualTo { _.from } map { (p, e) => (e.to, p.rank * e.transitionProb) }
-//
-// ranksForNeighbors.groupBy { case (node, rank) => node }
-// .reduce { (a, b) => (a._1, a._2 + b._2) }
-// .map { case (node, rank) => PageWithRank(node, rank * dampening + randomJump) }
-// }
-//
-// val finalRanks = pagesWithRank.iterate(maxIterations, computeRank)
-//
-// val output = finalRanks.write(outputPath, CsvOutputFormat())
-//
-// new ScalaPlan(Seq(output), "Connected Components")
-// }
-//
-// override def getPlan(args: String*) = {
-// val planArgs: Array[String] = if (args.length < 5) Array[String]("", "", "", "", "") else args.toArray
-// val dop = if (args.size > 5) args(5).toInt else 1
-//
-// val plan = getScalaPlan(planArgs(0), planArgs(1), planArgs(2), planArgs(3).toLong, planArgs(4).toInt)
-// plan.setDefaultParallelism(dop)
-// plan
-// }
-//}
-//
-///**
-// * Executable entry point to run the program locally.
-// */
-//object RunPageRankWithWeight {
-//
-// def main(args: Array[String]) {
-// if (args.size < 5) {
-// println("PageRank <vertices> <edges> <result> <numVertices> <numIterations> [<parallelism=1>]")
-// return
-// }
-//
-// val dop = if (args.length > 5) args(5).toInt else 1
-// val plan = new PageRankWithWeight().getScalaPlan(args(0), args(1), args(2), args(3).toLong, args(4).toInt);
-//
-// plan.setDefaultParallelism(dop)
-// LocalExecutor.execute(plan)
-// }
-//}
-//
http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0dc76147/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
index 241a376..eb6fe9f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
@@ -23,7 +23,6 @@ import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
import static org.junit.Assert.fail;
import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.compiler.PactCompiler;
import org.apache.flink.compiler.plan.BulkIterationPlanNode;
@@ -53,8 +52,7 @@ public class PageRankCompilerTest extends CompilerTestBase{
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// get input data
- @SuppressWarnings("unchecked")
- DataSet<Tuple1<Long>> pagesInput = env.fromElements(new Tuple1<Long>(1l));
+ DataSet<Long> pagesInput = env.fromElements(1l);
@SuppressWarnings("unchecked")
DataSet<Tuple2<Long, Long>> linksInput =env.fromElements(new Tuple2<Long, Long>(1l, 2l));