You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by al...@apache.org on 2014/09/22 14:29:16 UTC

[34/60] git commit: Adds PageRankBasic Scala example. Removes old Scala examples

Adds PageRankBasic Scala example. Removes old Scala examples


Project: http://git-wip-us.apache.org/repos/asf/incubator-flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-flink/commit/0dc76147
Tree: http://git-wip-us.apache.org/repos/asf/incubator-flink/tree/0dc76147
Diff: http://git-wip-us.apache.org/repos/asf/incubator-flink/diff/0dc76147

Branch: refs/heads/master
Commit: 0dc761473a4bfad5f22152cec443dde7cfaee7eb
Parents: 81e81b9
Author: Fabian Hueske <fh...@apache.org>
Authored: Thu Sep 11 15:42:29 2014 +0200
Committer: Aljoscha Krettek <al...@gmail.com>
Committed: Mon Sep 22 09:59:58 2014 +0200

----------------------------------------------------------------------
 .../flink/example/java/graph/PageRankBasic.java |  23 ++-
 .../example/java/graph/util/PageRankData.java   |  83 ++++-----
 .../scala/graph/ComputeEdgeDegrees.scala        | 123 ------------
 .../flink/examples/scala/graph/PageRank.scala   | 108 -----------
 .../examples/scala/graph/PageRankBasic.scala    | 185 +++++++++++++++++++
 .../scala/graph/PageRankWithWeight.scala        | 108 -----------
 .../iterations/PageRankCompilerTest.java        |   4 +-
 7 files changed, 240 insertions(+), 394 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0dc76147/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
index 656c424..8a57007 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/PageRankBasic.java
@@ -91,7 +91,7 @@ public class PageRankBasic {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		
 		// get input data
-		DataSet<Tuple1<Long>> pagesInput = getPagesDataSet(env);
+		DataSet<Long> pagesInput = getPagesDataSet(env);
 		DataSet<Tuple2<Long, Long>> linksInput = getLinksDataSet(env);
 		
 		// assign initial rank to pages
@@ -138,7 +138,7 @@ public class PageRankBasic {
 	/** 
 	 * A map function that assigns an initial rank to all pages. 
 	 */
-	public static final class RankAssigner implements MapFunction<Tuple1<Long>, Tuple2<Long, Double>> {
+	public static final class RankAssigner implements MapFunction<Long, Tuple2<Long, Double>> {
 		Tuple2<Long, Double> outPageWithRank;
 		
 		public RankAssigner(double rank) {
@@ -146,8 +146,8 @@ public class PageRankBasic {
 		}
 		
 		@Override
-		public Tuple2<Long, Double> map(Tuple1<Long> page) {
-			outPageWithRank.f0 = page.f0;
+		public Tuple2<Long, Double> map(Long page) {
+			outPageWithRank.f0 = page;
 			return outPageWithRank;
 		}
 	}
@@ -259,12 +259,17 @@ public class PageRankBasic {
 		return true;
 	}
 	
-	private static DataSet<Tuple1<Long>> getPagesDataSet(ExecutionEnvironment env) {
+	private static DataSet<Long> getPagesDataSet(ExecutionEnvironment env) {
 		if(fileOutput) {
-			return env.readCsvFile(pagesInputPath)
-						.fieldDelimiter(' ')
-						.lineDelimiter("\n")
-						.types(Long.class);
+			return env
+						.readCsvFile(pagesInputPath)
+							.fieldDelimiter(' ')
+							.lineDelimiter("\n")
+							.types(Long.class)
+						.map(new MapFunction<Tuple1<Long>, Long>() {
+							@Override
+							public Long map(Tuple1<Long> v) { return v.f0; }
+						});
 		} else {
 			return PageRankData.getDefaultPagesDataSet(env);
 		}

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0dc76147/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/PageRankData.java
----------------------------------------------------------------------
diff --git a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/PageRankData.java b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/PageRankData.java
index 6e494a4..10cf748 100644
--- a/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/PageRankData.java
+++ b/flink-examples/flink-java-examples/src/main/java/org/apache/flink/example/java/graph/util/PageRankData.java
@@ -22,11 +22,9 @@ package org.apache.flink.example.java.graph.util;
 import java.util.ArrayList;
 import java.util.List;
 
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
 
 /**
  * Provides the default data sets used for the PageRank example program.
@@ -35,52 +33,51 @@ import org.apache.flink.api.java.ExecutionEnvironment;
  */
 public class PageRankData {
 
+	public static final Object[][] EDGES = {
+		{1L, 2L},
+		{1L, 15L},
+		{2L, 3L},
+		{2L, 4L},
+		{2L, 5L},
+		{2L, 6L},
+		{2L, 7L},
+		{3L, 13L},
+		{4L, 2L},
+		{5L, 11L},
+		{5L, 12L},
+		{6L, 1L},
+		{6L, 7L},
+		{6L, 8L},
+		{7L, 1L},
+		{7L, 8L},
+		{8L, 1L},
+		{8L, 9L},
+		{8L, 10L},
+		{9L, 14L},
+		{9L, 1L},
+		{10L, 1L},
+		{10L, 13L},
+		{11L, 12L},
+		{11L, 1L},
+		{12L, 1L},
+		{13L, 14L},
+		{14L, 12L},
+		{15L, 1L},
+	};
+	
 	private static long numPages = 15;
 	
-	public static DataSet<Tuple1<Long>> getDefaultPagesDataSet(ExecutionEnvironment env) {
-		
-		List<Tuple1<Long>> data = new ArrayList<Tuple1<Long>>();
+	public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
 		
-		for(long i=0; i<numPages; i++) {
-			data.add(new Tuple1<Long>(i));
+		List<Tuple2<Long, Long>> edges = new ArrayList<Tuple2<Long, Long>>();
+		for(Object[] e : EDGES) {
+			edges.add(new Tuple2<Long, Long>((Long)e[0], (Long)e[1]));
 		}
-		return env.fromCollection(data);
+		return env.fromCollection(edges);
 	}
 	
-	public static DataSet<Tuple2<Long, Long>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
-		
-		List<Tuple2<Long, Long>> data = new ArrayList<Tuple2<Long, Long>>();
-		data.add(new Tuple2<Long, Long>(1L, 2L));
-		data.add(new Tuple2<Long, Long>(1L, 15L));
-		data.add(new Tuple2<Long, Long>(2L, 3L));
-		data.add(new Tuple2<Long, Long>(2L, 4L));
-		data.add(new Tuple2<Long, Long>(2L, 5L));
-		data.add(new Tuple2<Long, Long>(2L, 6L));
-		data.add(new Tuple2<Long, Long>(2L, 7L));
-		data.add(new Tuple2<Long, Long>(3L, 13L));
-		data.add(new Tuple2<Long, Long>(4L, 2L));
-		data.add(new Tuple2<Long, Long>(5L, 11L));
-		data.add(new Tuple2<Long, Long>(5L, 12L));
-		data.add(new Tuple2<Long, Long>(6L, 1L));
-		data.add(new Tuple2<Long, Long>(6L, 7L));
-		data.add(new Tuple2<Long, Long>(6L, 8L));
-		data.add(new Tuple2<Long, Long>(7L, 1L));
-		data.add(new Tuple2<Long, Long>(7L, 8L));
-		data.add(new Tuple2<Long, Long>(8L, 1L));
-		data.add(new Tuple2<Long, Long>(8L, 9L));
-		data.add(new Tuple2<Long, Long>(8L, 10L));
-		data.add(new Tuple2<Long, Long>(9L, 14L));
-		data.add(new Tuple2<Long, Long>(9L, 1L));
-		data.add(new Tuple2<Long, Long>(10L, 1L));
-		data.add(new Tuple2<Long, Long>(10L, 13L));
-		data.add(new Tuple2<Long, Long>(11L, 12L));
-		data.add(new Tuple2<Long, Long>(11L, 1L));
-		data.add(new Tuple2<Long, Long>(12L, 1L));
-		data.add(new Tuple2<Long, Long>(13L, 14L));
-		data.add(new Tuple2<Long, Long>(14L, 12L));
-		data.add(new Tuple2<Long, Long>(15L, 1L));
-		
-		return env.fromCollection(data);
+	public static DataSet<Long> getDefaultPagesDataSet(ExecutionEnvironment env) {
+		return env.generateSequence(1, 15);
 	}
 	
 	public static long getNumberOfPages() {

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0dc76147/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ComputeEdgeDegrees.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ComputeEdgeDegrees.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ComputeEdgeDegrees.scala
deleted file mode 100644
index 5200074..0000000
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/ComputeEdgeDegrees.scala
+++ /dev/null
@@ -1,123 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//
-//package org.apache.flink.examples.scala.graph
-//
-//import org.apache.flink.client.LocalExecutor
-//import org.apache.flink.api.common.Program
-//import org.apache.flink.api.common.ProgramDescription
-//
-//import org.apache.flink.api.scala._
-//import org.apache.flink.api.scala.operators._
-//
-///**
-// * Annotates edges with associated vertex degrees.
-// */
-//class ComputeEdgeDegrees extends Program with ProgramDescription with Serializable {
-//  override def getDescription() = {
-//    "Parameters: [numSubStasks] [input file] [output file]"
-//  }
-//  override def getPlan(args: String*) = {
-//    getScalaPlan(args(0).toInt, args(1), args(2))
-//  }
-//
-//  /*
-//   * Output formatting function for edges with annotated degrees
-//   */
-//  def formatEdgeWithDegrees = (v1: Int, v2: Int, c1: Int, c2: Int) => "%d,%d|%d,%d".format(v1, v2, c1, c2)
-//
-//  /*
-//   * Emits one edge for each unique input edge with the vertex degree of the first(and grouping key) vertex.
-//   * The degree of the second (non-grouping key) vertexes are set to zero.
-//   * Edges are projected such that smaller vertex is the first vertex.
-//   */
-//  def annotateFirstVertexDegree(eI: Iterator[(Int, Int)]): List[(Int, Int, Int, Int)] = {
-//    val eL = eI.toList
-//    val eLUniq = eL.distinct
-//    val cnt = eLUniq.size
-//    for (e <- eLUniq)
-//      yield if (e._1 < e._2)
-//    	  		(e._1, e._2, cnt, 0)
-//        	else
-//        		(e._2, e._1, 0, cnt)
-//  }
-//
-//  /*
-//   * Combines the degrees of both vertexes of an edge.
-//   */
-//  def combineVertexDegrees(eI: Iterator[(Int, Int, Int, Int)]) : (Int, Int, Int, Int) = {
-//
-//    val eL = eI.toList
-//    if (eL.size != 2)
-//    	throw new RuntimeException("Problem when combinig vertex counts");
-//
-//    if (eL(0)._3 == 0 && eL(1)._4 == 0)
-//      (eL(0)._1, eL(1)._3, eL(0)._2, eL(0)._4)
-//    else
-//      (eL(0)._1, eL(0)._3, eL(0)._2, eL(1)._4)
-//
-//  }
-//
-//  def getScalaPlan(numSubTasks: Int, edgeInput: String, annotatedEdgeOutput: String) = {
-//
-//    /*
-//     * Input format for edges.
-//     * Edges are separated by new line '\n'.
-//     * An edge is represented as two Integer vertex IDs which are separated by a blank ','.
-//     */
-//    val edges = DataSource(edgeInput, CsvInputFormat[(Int, Int)]("\n", ','))
-//
-//    /*
-//     * Emit each edge twice with both vertex orders.
-//     */
-//    val projEdges = edges flatMap { (e) => Iterator((e._1, e._2) , (e._2, e._1)) }
-//
-//    /*
-//     * Annotates each edges with degree for the first vertex.
-//     */
-//    val vertexCnts = projEdges groupBy { _._1 } reduceGroup { annotateFirstVertexDegree } flatMap {x => x.iterator }
-//
-//    /*
-//     * Combines the degrees of both vertexes of an edge.
-//     */
-//    val combinedVertexCnts = vertexCnts groupBy { (x) => (x._1, x._2) } reduceGroup { combineVertexDegrees }
-//
-//    /*
-//     * Emit annotated edges.
-//     */
-//    val output = combinedVertexCnts.write(annotatedEdgeOutput, DelimitedOutputFormat(formatEdgeWithDegrees.tupled))
-//
-//    val plan = new ScalaPlan(Seq(output), "Compute Edge Degrees")
-//    plan.setDefaultParallelism(numSubTasks)
-//    plan
-//  }
-//
-//  object RunComputeEdgeDegrees {
-//  def main(args: Array[String]) {
-//    val ced = new ComputeEdgeDegrees
-//    if (args.size < 3) {
-//      println(ced.getDescription)
-//      return
-//    }
-//    val plan = ced.getScalaPlan(args(0).toInt, args(1), args(2))
-//    LocalExecutor.execute(plan)
-//    System.exit(0)
-//  }
-//}
-//}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0dc76147/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRank.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRank.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRank.scala
deleted file mode 100644
index 2ef029f..0000000
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRank.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.examples.scala.graph;
-//
-//import org.apache.flink.client.LocalExecutor
-//import org.apache.flink.api.common.Program
-//
-//import org.apache.flink.api.scala._
-//import org.apache.flink.api.scala.operators._
-//
-///**
-// * An example program computing the page rank for each vertex in a graph.
-// * The graph is initially represented by vertices and edges. Vertices are numeric identifiers, while
-// * edges are pairs of identifiers that represent the source and target vertex.
-// *
-// * This variant of page rank assumes that all edges that originate at one vertex have an equal
-// * probability of being chosen.
-// */
-//class PageRank extends Program with Serializable {
-//
-//  def getScalaPlan(verticesPath: String, edgesPath: String, outputPath: String, numVertices: Long, maxIterations: Int) = {
-//
-//    case class PageWithRank(pageId: Long, rank: Double)
-//    case class Edge(from: Long, to: Long)
-//    case class Adjacency(vertex: Long, neighbors: List[Long])
-//
-//    // read the pages and edges. the pages are only single decimal identifiers, the edges pairs of identifiers
-//    val pages = DataSource(verticesPath, CsvInputFormat[Long]())
-//    val edges = DataSource(edgesPath, CsvInputFormat[Edge]("\n", ' '))
-//
-//    // some constants used in the specific rank computation
-//    val dampening = 0.85
-//    val randomJump = (1.0 - dampening) / numVertices
-//    val initialRank = 1.0 / numVertices
-//
-//    // assign the initial uniform rank to all pages
-//    val pagesWithRank = pages map { p => PageWithRank(p, initialRank) }
-//
-//    // transform the edges from a list of (from -> target) pairs to an adjacency list (from -> [all-targets])
-//    val adjacencies = edges.groupBy(_.from).reduceGroup(x => x.foldLeft(Adjacency(0, List[Long]()))((a, e) => Adjacency(e.from, e.to :: a.neighbors)));
-//
-//    def computeRank(ranks: DataSetOLD[PageWithRank]) = {
-//
-//      val ranksForNeighbors = ranks join adjacencies where { _.pageId } isEqualTo { _.vertex } flatMap ( (p, e) => {
-//        val numNeighbors = e.neighbors.length
-//
-//        for (target <- e.neighbors)
-//          yield (target, p.rank / numNeighbors)
-//
-//      });
-//
-//      ranksForNeighbors.groupBy { case (node, rank) => node }
-//        .reduce { (a, b) => (a._1, a._2 + b._2) }
-//        .map { case (node, rank) => PageWithRank(node, rank * dampening + randomJump) }
-//    }
-//
-//    val finalRanks = pagesWithRank.iterate(maxIterations, computeRank)
-//
-//    val output = finalRanks.write(outputPath, CsvOutputFormat())
-//
-//    new ScalaPlan(Seq(output), "Page Rank")
-//  }
-//
-//  override def getPlan(args: String*) = {
-//    val planArgs: Array[String] = if (args.length < 5) Array[String]("", "", "", "", "") else args.toArray
-//    val dop = if (args.size > 5) args(5).toInt else 1
-//
-//    val plan = getScalaPlan(planArgs(0), planArgs(1), planArgs(2), planArgs(3).toLong, planArgs(4).toInt)
-//    plan.setDefaultParallelism(dop)
-//    plan
-//  }
-//}
-//
-///**
-// * Executable entry point to run the program locally.
-// */
-//object RunPageRank {
-//
-//  def main(args: Array[String]) {
-//    if (args.size < 5) {
-//      println("PageRank <pages input path> <links input path> <result path> <num pages> <num iterations> [<parallelism=1>]")
-//      return
-//    }
-//
-//    val dop = if (args.length > 5) args(5).toInt else 1
-//    val plan = new PageRank().getScalaPlan(args(0), args(1), args(2), args(3).toLong, args(4).toInt);
-//
-//    plan.setDefaultParallelism(dop)
-//    LocalExecutor.execute(plan)
-//  }
-//}
-//

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0dc76147/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
new file mode 100644
index 0000000..e24727c
--- /dev/null
+++ b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankBasic.scala
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.examples.scala.graph
+
+import scala.collection.JavaConverters._
+import org.apache.flink.api.scala._
+import org.apache.flink.example.java.graph.util.PageRankData
+import org.apache.flink.util.Collector
+import org.apache.flink.api.java.aggregation.Aggregations.SUM
+
+
+/**
+ * A basic implementation of the Page Rank algorithm using a bulk iteration.
+ * 
+ * <p>
+ * This implementation requires a set of pages and a set of directed links as input and works as follows. <br> 
+ * In each iteration, the rank of every page is evenly distributed to all pages it points to.
+ * Each page collects the partial ranks of all pages that point to it, sums them up, and applies a dampening factor to the sum.
+ * The result is the new rank of the page. A new iteration is started with the new ranks of all pages.
+ * This implementation terminates after a fixed number of iterations.<br>
+ * This is the Wikipedia entry for the <a href="http://en.wikipedia.org/wiki/Page_rank">Page Rank algorithm</a>. 
+ * 
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <ul>
+ * <li>Pages represented as an (long) ID separated by new-line characters.<br> 
+ * For example <code>"1\n2\n12\n42\n63\n"</code> gives five pages with IDs 1, 2, 12, 42, and 63.
+ * <li>Links are represented as pairs of page IDs which are separated by space 
+ * characters. Links are separated by new-line characters.<br>
+ * For example <code>"1 2\n2 12\n1 12\n42 63\n"</code> gives four (directed) links (1)->(2), (2)->(12), (1)->(12), and (42)->(63).<br>
+ * For this simple implementation it is required that each page has at least one incoming and one outgoing link (a page can point to itself).
+ * </ul>
+ * 
+ * <p>
+ * Usage: <code>PageRankBasic &lt;pages path&gt; &lt;links path&gt; &lt;output path&gt; &lt;num pages&gt; &lt;num iterations&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from {@link PageRankData} and 10 iterations.
+ * 
+ * <p>
+ * This example shows how to use:
+ * <ul>
+ * <li>Bulk Iterations
+ * <li>Default Join
+ * <li>Configure user-defined functions using constructor parameters.
+ * </ul> 
+ *
+ */
+object PageRankBasic {
+	
+	private final val DAMPENING_FACTOR: Double = 0.85;
+	private final val EPSILON: Double = 0.0001;
+
+	def main(args: Array[String]) {
+		if (!parseParameters(args)) {
+			return
+		}
+		
+		// set up execution environment
+		val env = ExecutionEnvironment.getExecutionEnvironment
+		
+		// read input data
+		val pages = getPagesDataSet(env)
+		val links = getLinksDataSet(env)
+		
+		// assign initial ranks to pages
+		val pagesWithRanks = pages.map(p => Page(p, (1.0/numPages)))
+		
+		// build adjacency list from link input
+		val adjacencyLists = links
+								// initialize lists
+								.map( e => AdjacencyList(e.sourceId, Array[java.lang.Long](e.targetId) ))
+								// concatenate lists
+								.groupBy(0).reduce( (l1, l2) => AdjacencyList(l1.sourceId, l1.targetIds ++ l2.targetIds))
+		
+		// start iteration
+		val finalRanks = pagesWithRanks.iterateWithTermination(maxIterations) {
+			currentRanks => 
+				val newRanks = currentRanks
+								// distribute ranks to target pages
+								.join(adjacencyLists).where(0).equalTo(0)
+								.flatMap { x => for(targetId <- x._2.targetIds) yield Page(targetId, (x._1.rank / x._2.targetIds.length))}
+								// collect ranks and sum them up
+								.groupBy(0).aggregate(SUM, 1)
+								// apply dampening factor
+								.map { p => Page(p.pageId, (p.rank * DAMPENING_FACTOR) + ((1 - DAMPENING_FACTOR) / numPages) ) }
+				
+				// terminate if no rank update was significant
+				val termination = currentRanks
+									.join(newRanks).where(0).equalTo(0)
+									// check for significant update
+									.filter( x => math.abs(x._1.rank - x._2.rank) > EPSILON )
+				
+				(newRanks, termination)
+		}
+		
+		val result = finalRanks;
+								
+		// emit result
+		if (fileOutput) {
+			result.writeAsCsv(outputPath, "\n", " ")
+		} else {
+			result.print()
+		}
+		
+		// execute program
+		env.execute("Basic PageRank Example")
+	}
+	
+	// *************************************************************************
+	//     USER TYPES
+	// *************************************************************************
+
+	case class Link(sourceId: Long, targetId: Long)
+	case class Page(pageId: java.lang.Long, rank: Double)
+	case class AdjacencyList(sourceId: java.lang.Long, targetIds: Array[java.lang.Long])
+	
+	// *************************************************************************
+	//     UTIL METHODS
+	// *************************************************************************
+
+	private def parseParameters(args: Array[String]): Boolean = {
+		if (args.length > 0) {
+			fileOutput = true
+			if (args.length == 5) {
+				pagesInputPath = args(0)
+				linksInputPath = args(1)
+				outputPath = args(2)
+				numPages = args(3).toLong
+				maxIterations = args(4).toInt
+			} else {
+				System.err.println("Usage: PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>");
+				false
+			}
+		} else {
+			System.out.println("Executing PageRank Basic example with default parameters and built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: PageRankBasic <pages path> <links path> <output path> <num pages> <num iterations>");
+			
+			numPages = PageRankData.getNumberOfPages();
+		}
+		true
+	}
+
+	private def getPagesDataSet(env: ExecutionEnvironment): DataSet[Long] = {
+		if(fileOutput) {
+			env.readCsvFile[Tuple1[Long]](pagesInputPath, fieldDelimiter = ' ', lineDelimiter = "\n")
+				.map(x => x._1)
+		} else {
+			env.fromCollection(Seq.range(1, PageRankData.getNumberOfPages()+1))
+		}
+	}
+	
+	private def getLinksDataSet(env: ExecutionEnvironment): DataSet[Link] = {
+		if (fileOutput) {
+			env.readCsvFile[(Long, Long)](linksInputPath, fieldDelimiter = ' ', includedFields = Array(0, 1))
+				.map { x => Link(x._1, x._2) }
+		} else {
+			val edges = PageRankData.EDGES.map{ case Array(v1, v2) => Link(v1.asInstanceOf[Long], v2.asInstanceOf[Long]) }
+			env.fromCollection(edges)
+		}
+	}	
+	
+	private var fileOutput: Boolean = false
+	private var pagesInputPath: String = null
+	private var linksInputPath: String = null
+	private var outputPath: String = null
+	private var numPages: Long = 0;
+	private var maxIterations: Int = 10;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0dc76147/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankWithWeight.scala
----------------------------------------------------------------------
diff --git a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankWithWeight.scala b/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankWithWeight.scala
deleted file mode 100644
index 97ee62e..0000000
--- a/flink-examples/flink-scala-examples/src/main/scala/org/apache/flink/examples/scala/graph/PageRankWithWeight.scala
+++ /dev/null
@@ -1,108 +0,0 @@
-///**
-// * Licensed to the Apache Software Foundation (ASF) under one
-// * or more contributor license agreements.  See the NOTICE file
-// * distributed with this work for additional information
-// * regarding copyright ownership.  The ASF licenses this file
-// * to you under the Apache License, Version 2.0 (the
-// * "License"); you may not use this file except in compliance
-// * with the License.  You may obtain a copy of the License at
-// *
-// *     http://www.apache.org/licenses/LICENSE-2.0
-// *
-// * Unless required by applicable law or agreed to in writing, software
-// * distributed under the License is distributed on an "AS IS" BASIS,
-// * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// * See the License for the specific language governing permissions and
-// * limitations under the License.
-// */
-//
-//package org.apache.flink.examples.scala.graph;
-//
-//import scala.math._
-//
-//import org.apache.flink.api.scala._
-//import org.apache.flink.api.scala.operators._
-//import org.apache.flink.api.scala.analysis.GlobalSchemaPrinter
-//
-//import org.apache.flink.client.LocalExecutor
-//import org.apache.flink.api.common.Program
-//import org.apache.flink.api.common.ProgramDescription
-//import org.apache.flink.api.common.Plan
-//import org.apache.flink.api.java.record.operators.DeltaIteration
-//
-///**
-// * An implementation of the PageRank algorithm for graph vertex ranking. Runs a specified fix number
-// * of iterations. This version of page rank expects the edges to define a transition
-// * probability and hence allows to model situations where not all outgoing links are equally probable.
-// *
-// * <p>
-// *
-// * Expects inputs are:
-// *  1. Path to a file of node ids, as a sequence of Longs, line delimited.
-// *  2. Path to a csv file of edges in the format <tt>sourceId targetId transitionProbability</tt> (fields separated by spaces).
-// *    The ids are expected to be Longs, the transition probability a float or double.
-// *  3. Path to where the output should be written
-// *  4. The number of vertices
-// *  5. The number of iterations
-// */
-//class PageRankWithWeight extends Program with Serializable {
-//
-//  def getScalaPlan(verticesPath: String, edgesPath: String, outputPath: String, numVertices: Long, maxIterations: Int) = {
-//
-//    case class PageWithRank(pageId: Long, rank: Double)
-//    case class Edge(from: Long, to: Long, transitionProb: Double)
-//
-//    val pages = DataSource(verticesPath, CsvInputFormat[Long]())
-//    val edges = DataSource(edgesPath, CsvInputFormat[Edge]("\n", ' ')) // line delimiter (\n), field delimiter (' ')
-//
-//    val dampening = 0.85
-//    val randomJump = (1.0 - dampening) / numVertices
-//    val initialRank = 1.0 / numVertices
-//
-//    val pagesWithRank = pages map { p => PageWithRank(p, initialRank) }
-//
-//    def computeRank(ranks: DataSetOLD[PageWithRank]) = {
-//
-//      val ranksForNeighbors = ranks join edges where { _.pageId } isEqualTo { _.from } map { (p, e) => (e.to, p.rank * e.transitionProb) }
-//
-//      ranksForNeighbors.groupBy { case (node, rank) => node }
-//        .reduce { (a, b) => (a._1, a._2 + b._2) }
-//        .map { case (node, rank) => PageWithRank(node, rank * dampening + randomJump) }
-//    }
-//
-//    val finalRanks = pagesWithRank.iterate(maxIterations, computeRank)
-//
-//    val output = finalRanks.write(outputPath, CsvOutputFormat())
-//
-//    new ScalaPlan(Seq(output), "Connected Components")
-//  }
-//
-//  override def getPlan(args: String*) = {
-//    val planArgs: Array[String] = if (args.length < 5) Array[String]("", "", "", "", "") else args.toArray
-//    val dop = if (args.size > 5) args(5).toInt else 1
-//
-//    val plan = getScalaPlan(planArgs(0), planArgs(1), planArgs(2), planArgs(3).toLong, planArgs(4).toInt)
-//    plan.setDefaultParallelism(dop)
-//    plan
-//  }
-//}
-//
-///**
-// * Executable entry point to run the program locally.
-// */
-//object RunPageRankWithWeight {
-//
-//  def main(args: Array[String]) {
-//    if (args.size < 5) {
-//      println("PageRank <vertices> <edges> <result> <numVertices> <numIterations> [<parallelism=1>]")
-//      return
-//    }
-//
-//    val dop = if (args.length > 5) args(5).toInt else 1
-//    val plan = new PageRankWithWeight().getScalaPlan(args(0), args(1), args(2), args(3).toLong, args(4).toInt);
-//
-//    plan.setDefaultParallelism(dop)
-//    LocalExecutor.execute(plan)
-//  }
-//}
-//

http://git-wip-us.apache.org/repos/asf/incubator-flink/blob/0dc76147/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
index 241a376..eb6fe9f 100644
--- a/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
+++ b/flink-tests/src/test/java/org/apache/flink/test/compiler/iterations/PageRankCompilerTest.java
@@ -23,7 +23,6 @@ import static org.apache.flink.api.java.aggregation.Aggregations.SUM;
 import static org.junit.Assert.fail;
 
 import org.apache.flink.api.common.Plan;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.compiler.PactCompiler;
 import org.apache.flink.compiler.plan.BulkIterationPlanNode;
@@ -53,8 +52,7 @@ public class PageRankCompilerTest extends CompilerTestBase{
 			final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 			
 			// get input data
-			@SuppressWarnings("unchecked")
-			DataSet<Tuple1<Long>> pagesInput = env.fromElements(new Tuple1<Long>(1l));
+			DataSet<Long> pagesInput = env.fromElements(1l);
 			@SuppressWarnings("unchecked")
 			DataSet<Tuple2<Long, Long>> linksInput =env.fromElements(new Tuple2<Long, Long>(1l, 2l));