You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/03/02 23:33:22 UTC

[1/2] flink git commit: [FLINK-4896] [gelly] PageRank algorithm for directed graphs

Repository: flink
Updated Branches:
  refs/heads/master 438276de8 -> cb9e409b7


[FLINK-4896] [gelly] PageRank algorithm for directed graphs

Adds a PageRank algorithm using Flink transformation that handles source
and sink vertices (in- or out-degree of zero). The scatter-gather and
gather-sum-apply PageRank implementations are moved to Gelly examples.

This closes #2733


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea14053f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea14053f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea14053f

Branch: refs/heads/master
Commit: ea14053fe32280ffc36e586b5d3712c751fa1f84
Parents: 438276d
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Oct 24 16:14:16 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Mar 2 16:41:00 2017 -0500

----------------------------------------------------------------------
 docs/dev/libs/gelly/library_methods.md          |  46 +-
 .../flink/graph/examples/GSAPageRank.java       | 123 +++++
 .../apache/flink/graph/examples/PageRank.java   | 125 +++++
 .../flink/graph/library/PageRankITCase.java     | 127 -----
 .../graph/test/examples/PageRankITCase.java     | 129 +++++
 .../apache/flink/graph/library/GSAPageRank.java | 123 -----
 .../apache/flink/graph/library/PageRank.java    | 125 -----
 .../graph/library/link_analysis/Functions.java  |  43 ++
 .../flink/graph/library/link_analysis/HITS.java |  47 +-
 .../graph/library/link_analysis/PageRank.java   | 534 +++++++++++++++++++
 .../graph/library/link_analysis/HITSTest.java   |  82 ++-
 .../library/link_analysis/PageRankTest.java     | 135 +++++
 12 files changed, 1194 insertions(+), 445 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/docs/dev/libs/gelly/library_methods.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/library_methods.md b/docs/dev/libs/gelly/library_methods.md
index e2288b4..94eee2e 100644
--- a/docs/dev/libs/gelly/library_methods.md
+++ b/docs/dev/libs/gelly/library_methods.md
@@ -143,29 +143,6 @@ The constructor takes one parameter:
 
 * `maxIterations`: the maximum number of iterations to run.
 
-## PageRank
-
-#### Overview
-An implementation of a simple [PageRank algorithm](https://en.wikipedia.org/wiki/PageRank), using [scatter-gather iterations](#scatter-gather-iterations).
-PageRank is an algorithm that was first used to rank web search engine results. Today, the algorithm and many variations, are used in various graph application domains. The idea of PageRank is that important or relevant pages tend to link to other important pages.
-
-#### Details
-The algorithm operates in iterations, where pages distribute their scores to their neighbors (pages they have links to) and subsequently update their scores based on the partial values they receive. The implementation assumes that each page has at least one incoming and one outgoing link.
-In order to consider the importance of a link from one page to another, scores are divided by the total number of out-links of the source page. Thus, a page with 10 links will distribute 1/10 of its score to each neighbor, while a page with 100 links, will distribute 1/100 of its score to each neighboring page. This process computes what is often called the transition probablities, i.e. the probability that some page will lead to other page while surfing the web. To correctly compute the transition probabilities, this implementation expects the edge values to be initialised to 1.0.
-
-#### Usage
-The algorithm takes as input a `Graph` with any vertex type, `Double` vertex values, and `Double` edge values. Edges values should be initialized to 1.0, in order to correctly compute the transition probabilities. Otherwise, the transition probability for an Edge `(u, v)` will be set to the edge value divided by `u`'s out-degree. The algorithm returns a `DataSet` of vertices, where the vertex value corresponds to assigned rank after convergence (or maximum iterations).
-The constructors take the following parameters:
-
-* `beta`: the damping factor.
-* `maxIterations`: the maximum number of iterations to run.
-
-## GSA PageRank
-
-The algorithm is implemented using [gather-sum-apply iterations](#gather-sum-apply-iterations).
-
-See the [PageRank](#pagerank) library method for implementation details and usage information.
-
 ## Single Source Shortest Paths
 
 #### Overview
@@ -353,6 +330,29 @@ The algorithm takes a directed graph as input and outputs a `DataSet` of `Tuple3
 and authority score. Termination is configured with a maximum number of iterations and/or a convergence threshold
 on the sum of the change in each score for each vertex between iterations.
 
+* `setParallelism`: override the operator parallelism
+
+### PageRank
+
+#### Overview
+[PageRank](https://en.wikipedia.org/wiki/PageRank) is an algorithm that was first used to rank web search engine
+results. Today, the algorithm and many variations are used in various graph application domains. The idea of PageRank is
+that important or relevant vertices tend to link to other important vertices.
+
+#### Details
+The algorithm operates in iterations, where pages distribute their scores to their neighbors (pages they have links to)
+and subsequently update their scores based on the sum of values they receive. In order to consider the importance of a
+link from one page to another, scores are divided by the total number of out-links of the source page. Thus, a page with
+10 links will distribute 1/10 of its score to each neighbor, while a page with 100 links will distribute 1/100 of its
+score to each neighboring page.
+
+#### Usage
+The algorithm takes a directed graph as input and outputs a `DataSet` where each `Result` contains the vertex ID and
+PageRank score. Termination is configured with a maximum number of iterations and/or a convergence threshold
+on the sum of the change in score for each vertex between iterations.
+
+* `setParallelism`: override the operator parallelism
+
 ## Metric
 
 ### Vertex Metrics

http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSAPageRank.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSAPageRank.java
new file mode 100644
index 0000000..db2e4f2
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GSAPageRank.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeJoinFunction;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GSAConfiguration;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.types.LongValue;
+
+/**
+ * This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration.
+ * The user can define the damping factor and the maximum number of iterations.
+ *
+ * The implementation assumes that each page has at least one incoming and one outgoing link.
+ */
+public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
+
+	private double beta;
+	private int maxIterations;
+
+	/**
+	 * Creates an instance of the GSA PageRank algorithm.
+	 *
+	 * The implementation assumes that each page has at least one incoming and one outgoing link.
+	 * 
+	 * @param beta the damping factor
+	 * @param maxIterations the maximum number of iterations
+	 */
+	public GSAPageRank(double beta, int maxIterations) {
+		this.beta = beta;
+		this.maxIterations = maxIterations;
+	}
+
+	@Override
+	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
+		DataSet<Tuple2<K, LongValue>> vertexOutDegrees = network.outDegrees();
+
+		Graph<K, Double, Double> networkWithWeights = network
+				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
+
+		GSAConfiguration parameters = new GSAConfiguration();
+		parameters.setOptNumVertices(true);
+
+		return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(), new SumRanks(),
+				new UpdateRanks<K>(beta), maxIterations, parameters)
+				.getVertices();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Page Rank UDFs
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class GatherRanks extends GatherFunction<Double, Double, Double> {
+
+		@Override
+		public Double gather(Neighbor<Double, Double> neighbor) {
+			double neighborRank = neighbor.getNeighborValue();
+
+			if(getSuperstepNumber() == 1) {
+				neighborRank = 1.0 / this.getNumberOfVertices();
+			}
+
+			return neighborRank * neighbor.getEdgeValue();
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumRanks extends SumFunction<Double, Double, Double> {
+
+		@Override
+		public Double sum(Double newValue, Double currentValue) {
+			return newValue + currentValue;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class UpdateRanks<K> extends ApplyFunction<K, Double, Double> {
+
+		private final double beta;
+
+		public UpdateRanks(double beta) {
+			this.beta = beta;
+		}
+
+		@Override
+		public void apply(Double rankSum, Double currentValue) {
+			setResult((1-beta)/this.getNumberOfVertices() + beta * rankSum);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitWeights implements EdgeJoinFunction<Double, LongValue> {
+
+		public Double edgeJoin(Double edgeValue, LongValue inputValue) {
+			return edgeValue / (double) inputValue.getValue();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/PageRank.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/PageRank.java
new file mode 100644
index 0000000..6be8116
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/PageRank.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.examples;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeJoinFunction;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.GatherFunction;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.ScatterFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
+import org.apache.flink.types.LongValue;
+
+/**
+ * This is an implementation of a simple PageRank algorithm, using a scatter-gather iteration.
+ * The user can define the damping factor and the maximum number of iterations.
+ *
+ * The implementation assumes that each page has at least one incoming and one outgoing link.
+ */
+public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
+
+	private double beta;
+	private int maxIterations;
+
+	/**
+	 * Creates an instance of the PageRank algorithm.
+	 *
+	 * The implementation assumes that each page has at least one incoming and one outgoing link.
+	 * 
+	 * @param beta the damping factor
+	 * @param maxIterations the maximum number of iterations
+	 */
+	public PageRank(double beta, int maxIterations) {
+		this.beta = beta;
+		this.maxIterations = maxIterations;
+	}
+
+	@Override
+	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
+		DataSet<Tuple2<K, LongValue>> vertexOutDegrees = network.outDegrees();
+
+		Graph<K, Double, Double> networkWithWeights = network
+				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
+
+		ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
+		parameters.setOptNumVertices(true);
+
+		return networkWithWeights.runScatterGatherIteration(new RankMessenger<K>(),
+				new VertexRankUpdater<K>(beta), maxIterations, parameters)
+				.getVertices();
+	}
+
+	/**
+	 * Distributes the rank of a vertex among all target vertices according to
+	 * the transition probability, which is associated with an edge as the edge
+	 * value.
+	 */
+	@SuppressWarnings("serial")
+	public static final class RankMessenger<K> extends ScatterFunction<K, Double, Double, Double> {
+		@Override
+		public void sendMessages(Vertex<K, Double> vertex) {
+			if (getSuperstepNumber() == 1) {
+				// initialize vertex ranks
+				vertex.setValue(1.0 / this.getNumberOfVertices());
+			}
+
+			for (Edge<K, Double> edge : getEdges()) {
+				sendMessageTo(edge.getTarget(), vertex.getValue() * edge.getValue());
+			}
+		}
+	}
+
+	/**
+	 * Function that updates the rank of a vertex by summing up the partial
+	 * ranks from all incoming messages and then applying the dampening formula.
+	 */
+	@SuppressWarnings("serial")
+	public static final class VertexRankUpdater<K> extends GatherFunction<K, Double, Double> {
+		private final double beta;
+
+		public VertexRankUpdater(double beta) {
+			this.beta = beta;
+		}
+
+		@Override
+		public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) {
+			double rankSum = 0.0;
+			for (double msg : inMessages) {
+				rankSum += msg;
+			}
+
+			// apply the dampening factor / random jump
+			double newRank = (beta * rankSum) + (1 - beta) / this.getNumberOfVertices();
+			setNewVertexValue(newRank);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitWeights implements EdgeJoinFunction<Double, LongValue> {
+		public Double edgeJoin(Double edgeValue, LongValue inputValue) {
+			return edgeValue / (double) inputValue.getValue();
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java
deleted file mode 100644
index 25a3e3f..0000000
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.examples.data.PageRankData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.Arrays;
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class PageRankITCase extends MultipleProgramsTestBase {
-
-	public PageRankITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Test
-	public void testPageRankWithThreeIterations() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
-			PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
-
-		List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3))
-			.collect();
-
-		compareWithDelta(result, 0.01);
-	}
-
-	@Test
-	public void testGSAPageRankWithThreeIterations() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
-			PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
-
-		List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3))
-			.collect();
-
-		compareWithDelta(result, 0.01);
-	}
-
-	@Test
-	public void testPageRankWithThreeIterationsAndNumOfVertices() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
-			PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
-
-		List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3))
-			.collect();
-
-		compareWithDelta(result, 0.01);
-	}
-
-	@Test
-	public void testGSAPageRankWithThreeIterationsAndNumOfVertices() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
-			PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
-
-		List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3))
-			.collect();
-
-		compareWithDelta(result, 0.01);
-	}
-
-	private void compareWithDelta(List<Vertex<Long, Double>> result, double delta) {
-
-		String resultString = "";
-		for (Vertex<Long, Double> v : result) {
-			resultString += v.f0.toString() + "," + v.f1.toString() + "\n";
-		}
-
-		String expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS;
-		String[] expected = expectedResult.isEmpty() ? new String[0] : expectedResult.split("\n");
-
-		String[] resultArray = resultString.isEmpty() ? new String[0] : resultString.split("\n");
-
-		Arrays.sort(expected);
-		Arrays.sort(resultArray);
-
-		for (int i = 0; i < expected.length; i++) {
-			String[] expectedFields = expected[i].split(",");
-			String[] resultFields = resultArray[i].split(",");
-
-			double expectedPayLoad = Double.parseDouble(expectedFields[1]);
-			double resultPayLoad = Double.parseDouble(resultFields[1]);
-
-			Assert.assertTrue("Values differ by more than the permissible delta",
-				Math.abs(expectedPayLoad - resultPayLoad) < delta);
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class InitMapper implements MapFunction<Long, Double> {
-		public Double map(Long value) {
-			return 1.0;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/PageRankITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/PageRankITCase.java
new file mode 100644
index 0000000..41f9a0f
--- /dev/null
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/test/examples/PageRankITCase.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.examples;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.examples.GSAPageRank;
+import org.apache.flink.graph.examples.PageRank;
+import org.apache.flink.graph.examples.data.PageRankData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class PageRankITCase extends MultipleProgramsTestBase {
+
+	public PageRankITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testPageRankWithThreeIterations() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+			PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+		List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3))
+			.collect();
+
+		compareWithDelta(result, 0.01);
+	}
+
+	@Test
+	public void testGSAPageRankWithThreeIterations() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+			PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+		List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3))
+			.collect();
+
+		compareWithDelta(result, 0.01);
+	}
+
+	@Test
+	public void testPageRankWithThreeIterationsAndNumOfVertices() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+			PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+		List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3))
+			.collect();
+
+		compareWithDelta(result, 0.01);
+	}
+
+	@Test
+	public void testGSAPageRankWithThreeIterationsAndNumOfVertices() throws Exception {
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
+			PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
+
+		List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3))
+			.collect();
+
+		compareWithDelta(result, 0.01);
+	}
+
+	private void compareWithDelta(List<Vertex<Long, Double>> result, double delta) {
+
+		String resultString = "";
+		for (Vertex<Long, Double> v : result) {
+			resultString += v.f0.toString() + "," + v.f1.toString() + "\n";
+		}
+
+		String expectedResult = PageRankData.RANKS_AFTER_3_ITERATIONS;
+		String[] expected = expectedResult.isEmpty() ? new String[0] : expectedResult.split("\n");
+
+		String[] resultArray = resultString.isEmpty() ? new String[0] : resultString.split("\n");
+
+		Arrays.sort(expected);
+		Arrays.sort(resultArray);
+
+		for (int i = 0; i < expected.length; i++) {
+			String[] expectedFields = expected[i].split(",");
+			String[] resultFields = resultArray[i].split(",");
+
+			double expectedPayLoad = Double.parseDouble(expectedFields[1]);
+			double resultPayLoad = Double.parseDouble(resultFields[1]);
+
+			Assert.assertTrue("Values differ by more than the permissible delta",
+				Math.abs(expectedPayLoad - resultPayLoad) < delta);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitMapper implements MapFunction<Long, Double> {
+		public Double map(Long value) {
+			return 1.0;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
deleted file mode 100644
index ef39395..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.EdgeJoinFunction;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GSAConfiguration;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.types.LongValue;
-
-/**
- * This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration.
- * The user can define the damping factor and the maximum number of iterations.
- *
- * The implementation assumes that each page has at least one incoming and one outgoing link.
- */
-public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
-
-	private double beta;
-	private int maxIterations;
-
-	/**
-	 * Creates an instance of the GSA PageRank algorithm.
-	 *
-	 * The implementation assumes that each page has at least one incoming and one outgoing link.
-	 * 
-	 * @param beta the damping factor
-	 * @param maxIterations the maximum number of iterations
-	 */
-	public GSAPageRank(double beta, int maxIterations) {
-		this.beta = beta;
-		this.maxIterations = maxIterations;
-	}
-
-	@Override
-	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
-		DataSet<Tuple2<K, LongValue>> vertexOutDegrees = network.outDegrees();
-
-		Graph<K, Double, Double> networkWithWeights = network
-				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
-
-		GSAConfiguration parameters = new GSAConfiguration();
-		parameters.setOptNumVertices(true);
-
-		return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(), new SumRanks(),
-				new UpdateRanks<K>(beta), maxIterations, parameters)
-				.getVertices();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Page Rank UDFs
-	// --------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("serial")
-	private static final class GatherRanks extends GatherFunction<Double, Double, Double> {
-
-		@Override
-		public Double gather(Neighbor<Double, Double> neighbor) {
-			double neighborRank = neighbor.getNeighborValue();
-
-			if(getSuperstepNumber() == 1) {
-				neighborRank = 1.0 / this.getNumberOfVertices();
-			}
-
-			return neighborRank * neighbor.getEdgeValue();
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumRanks extends SumFunction<Double, Double, Double> {
-
-		@Override
-		public Double sum(Double newValue, Double currentValue) {
-			return newValue + currentValue;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class UpdateRanks<K> extends ApplyFunction<K, Double, Double> {
-
-		private final double beta;
-
-		public UpdateRanks(double beta) {
-			this.beta = beta;
-		}
-
-		@Override
-		public void apply(Double rankSum, Double currentValue) {
-			setResult((1-beta)/this.getNumberOfVertices() + beta * rankSum);
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class InitWeights implements EdgeJoinFunction<Double, LongValue> {
-
-		public Double edgeJoin(Double edgeValue, LongValue inputValue) {
-			return edgeValue / (double) inputValue.getValue();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
deleted file mode 100644
index bf9b4e9..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ /dev/null
@@ -1,125 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeJoinFunction;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.GatherFunction;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.ScatterFunction;
-import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
-import org.apache.flink.types.LongValue;
-
-/**
- * This is an implementation of a simple PageRank algorithm, using a scatter-gather iteration.
- * The user can define the damping factor and the maximum number of iterations.
- *
- * The implementation assumes that each page has at least one incoming and one outgoing link.
- */
-public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
-
-	private double beta;
-	private int maxIterations;
-
-	/**
-	 * Creates an instance of the PageRank algorithm.
-	 *
-	 * The implementation assumes that each page has at least one incoming and one outgoing link.
-	 * 
-	 * @param beta the damping factor
-	 * @param maxIterations the maximum number of iterations
-	 */
-	public PageRank(double beta, int maxIterations) {
-		this.beta = beta;
-		this.maxIterations = maxIterations;
-	}
-
-	@Override
-	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
-		DataSet<Tuple2<K, LongValue>> vertexOutDegrees = network.outDegrees();
-
-		Graph<K, Double, Double> networkWithWeights = network
-				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
-
-		ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
-		parameters.setOptNumVertices(true);
-
-		return networkWithWeights.runScatterGatherIteration(new RankMessenger<K>(),
-				new VertexRankUpdater<K>(beta), maxIterations, parameters)
-				.getVertices();
-	}
-
-	/**
-	 * Distributes the rank of a vertex among all target vertices according to
-	 * the transition probability, which is associated with an edge as the edge
-	 * value.
-	 */
-	@SuppressWarnings("serial")
-	public static final class RankMessenger<K> extends ScatterFunction<K, Double, Double, Double> {
-		@Override
-		public void sendMessages(Vertex<K, Double> vertex) {
-			if (getSuperstepNumber() == 1) {
-				// initialize vertex ranks
-				vertex.setValue(1.0 / this.getNumberOfVertices());
-			}
-
-			for (Edge<K, Double> edge : getEdges()) {
-				sendMessageTo(edge.getTarget(), vertex.getValue() * edge.getValue());
-			}
-		}
-	}
-
-	/**
-	 * Function that updates the rank of a vertex by summing up the partial
-	 * ranks from all incoming messages and then applying the dampening formula.
-	 */
-	@SuppressWarnings("serial")
-	public static final class VertexRankUpdater<K> extends GatherFunction<K, Double, Double> {
-		private final double beta;
-
-		public VertexRankUpdater(double beta) {
-			this.beta = beta;
-		}
-
-		@Override
-		public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) {
-			double rankSum = 0.0;
-			for (double msg : inMessages) {
-				rankSum += msg;
-			}
-
-			// apply the dampening factor / random jump
-			double newRank = (beta * rankSum) + (1 - beta) / this.getNumberOfVertices();
-			setNewVertexValue(newRank);
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class InitWeights implements EdgeJoinFunction<Double, LongValue> {
-		public Double edgeJoin(Double edgeValue, LongValue inputValue) {
-			return edgeValue / (double) inputValue.getValue();
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java
new file mode 100644
index 0000000..5bb2f4c
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.link_analysis;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.DoubleValue;
+
+class Functions {
+
+	/**
+	 * Sum vertices' scores.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFields("0")
+	static class SumScore<T>
+		implements ReduceFunction<Tuple2<T, DoubleValue>> {
+		@Override
+		public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> left, Tuple2<T, DoubleValue> right)
+			throws Exception {
+			left.f1.setValue(left.f1.getValue() + right.f1.getValue());
+			return left;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
index 1be55f0..ecc1ad7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
@@ -21,7 +21,6 @@ package org.apache.flink.graph.library.link_analysis;
 import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
 import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
 import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.functions.RichJoinFunction;
@@ -38,6 +37,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.library.link_analysis.Functions.SumScore;
 import org.apache.flink.graph.library.link_analysis.HITS.Result;
 import org.apache.flink.graph.utils.Murmur3_32;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
@@ -50,8 +50,6 @@ import java.util.Collection;
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
- * http://www.cs.cornell.edu/home/kleinber/auth.pdf
- *
  * Hyperlink-Induced Topic Search computes two interdependent scores for every
  * vertex in a directed graph. A good "hub" links to good "authorities" and
  * good "authorities" are linked from good "hubs".
@@ -59,6 +57,8 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * This algorithm can be configured to terminate either by a limit on the number
  * of iterations, a convergence threshold, or both.
  *
+ * http://www.cs.cornell.edu/home/kleinber/auth.pdf
+ *
  * @param <K> graph ID type
  * @param <VV> vertex value type
  * @param <EV> edge value type
@@ -91,7 +91,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 	/**
 	 * Hyperlink-Induced Topic Search with a convergence threshold. The algorithm
-	 * terminates When the total change in hub and authority scores over all
+	 * terminates when the total change in hub and authority scores over all
 	 * vertices falls to or below the given threshold value.
 	 *
 	 * @param convergenceThreshold convergence threshold for sum of scores
@@ -154,13 +154,12 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		return true;
 	}
 
-
 	@Override
 	public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {
 		DataSet<Tuple2<K, K>> edges = input
 			.getEdges()
-			.flatMap(new ExtractEdgeIDs<K, EV>())
+			.map(new ExtractEdgeIDs<K, EV>())
 				.setParallelism(parallelism)
 				.name("Extract edge IDs");
 
@@ -270,15 +269,15 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 */
 	@ForwardedFields("0; 1")
 	private static class ExtractEdgeIDs<T, ET>
-	implements FlatMapFunction<Edge<T, ET>, Tuple2<T, T>> {
+	implements MapFunction<Edge<T, ET>, Tuple2<T, T>> {
 		private Tuple2<T, T> output = new Tuple2<>();
 
 		@Override
-		public void flatMap(Edge<T, ET> value, Collector<Tuple2<T, T>> out)
+		public Tuple2<T, T> map(Edge<T, ET> value)
 				throws Exception {
 			output.f0 = value.f0;
 			output.f1 = value.f1;
-			out.collect(output);
+			return output;
 		}
 	}
 
@@ -308,8 +307,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 *
 	 * @param <T> ID type
 	 */
-	@ForwardedFieldsFirst("0")
-	@ForwardedFieldsSecond("0")
+	@ForwardedFields("0")
 	private static class SumScores<T>
 	implements ReduceFunction<Tuple3<T, DoubleValue, DoubleValue>> {
 		@Override
@@ -345,23 +343,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	}
 
 	/**
-	 * Sum vertices' scores.
-	 *
-	 * @param <T> ID type
-	 */
-	@ForwardedFieldsFirst("0")
-	@ForwardedFieldsSecond("0")
-	private static class SumScore<T>
-	implements ReduceFunction<Tuple2<T, DoubleValue>> {
-		@Override
-		public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> left, Tuple2<T, DoubleValue> right)
-				throws Exception {
-			left.f1.setValue(left.f1.getValue() + right.f1.getValue());
-			return left;
-		}
-	}
-
-	/**
 	 * The authority score is the sum of hub scores of vertices on in-edges.
 	 *
 	 * @param <T> ID type
@@ -469,7 +450,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		private double changeInScores;
 
 		@Override
-		public void open(Configuration parameters) throws Exception {
+		public void open(Configuration parameters)
+				throws Exception {
 			super.open(parameters);
 
 			isInitialSuperstep = (getIterationRuntimeContext().getSuperstepNumber() == 1);
@@ -477,7 +459,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		}
 
 		@Override
-		public void close() throws Exception {
+		public void close()
+				throws Exception {
 			super.close();
 
 			DoubleSumAggregator agg = getIterationRuntimeContext().getIterationAggregator(CHANGE_IN_SCORES);
@@ -498,8 +481,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 	/**
 	 * Monitors the total change in hub and authority scores over all vertices.
-	 * The iteration terminates when the change in scores compared against the
-	 * prior iteration falls below the given convergence threshold.
+	 * The algorithm terminates when the change in scores compared against the
+	 * prior iteration falls to or below the given convergence threshold.
 	 *
 	 * An optimization of this implementation of HITS is to leave the initial
 	 * scores non-normalized; therefore, the change in scores after the first

http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
new file mode 100644
index 0000000..514fd4e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
@@ -0,0 +1,534 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.link_analysis;
+
+import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
+import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.directed.EdgeSourceDegrees;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.library.link_analysis.Functions.SumScore;
+import org.apache.flink.graph.library.link_analysis.PageRank.Result;
+import org.apache.flink.graph.utils.GraphUtils;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * PageRank computes a per-vertex score which is the sum of PageRank scores
+ * transmitted over in-edges. Each vertex's score is divided evenly among
+ * out-edges. High-scoring vertices are linked to by other high-scoring
+ * vertices; this is similar to the 'authority' score in {@link HITS}.
+ *
+ * http://ilpubs.stanford.edu:8090/422/1/1999-66.pdf
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class PageRank<K, VV, EV>
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
+
+	private static final String VERTEX_COUNT = "vertex count";
+
+	private static final String SUM_OF_SCORES = "sum of scores";
+
+	private static final String CHANGE_IN_SCORES = "change in scores";
+
+	// Required configuration
+	private final double dampingFactor;
+
+	private int maxIterations;
+
+	private double convergenceThreshold;
+
+	// Optional configuration
+	private int parallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * PageRank with a fixed number of iterations.
+	 *
+	 * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex
+	 * @param iterations fixed number of iterations
+	 */
+	public PageRank(double dampingFactor, int iterations) {
+		this(dampingFactor, iterations, Double.MAX_VALUE);
+	}
+
+	/**
+	 * PageRank with a convergence threshold. The algorithm terminates when the
+	 * change in score over all vertices falls to or below the given threshold value.
+	 *
+	 * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex
+	 * @param convergenceThreshold convergence threshold for sum of scores
+	 */
+	public PageRank(double dampingFactor, double convergenceThreshold) {
+		this(dampingFactor, Integer.MAX_VALUE, convergenceThreshold);
+	}
+
+	/**
+	 * PageRank with a convergence threshold and a maximum iteration count. The
+	 * algorithm terminates after either the given number of iterations or when
+	 * the change in score over all vertices falls to or below the given
+	 * threshold value.
+	 *
+	 * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex
+	 * @param maxIterations maximum number of iterations
+	 * @param convergenceThreshold convergence threshold for sum of scores
+	 */
+	public PageRank(double dampingFactor, int maxIterations, double convergenceThreshold) {
+		Preconditions.checkArgument(0 < dampingFactor && dampingFactor < 1,
+			"Damping factor must be between zero and one");
+		Preconditions.checkArgument(maxIterations > 0, "Number of iterations must be greater than zero");
+		Preconditions.checkArgument(convergenceThreshold > 0.0, "Convergence threshold must be greater than zero");
+
+		this.dampingFactor = dampingFactor;
+		this.maxIterations = maxIterations;
+		this.convergenceThreshold = convergenceThreshold;
+	}
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public PageRank<K, VV, EV> setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	@Override
+	protected String getAlgorithmName() {
+		return PageRank.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (! PageRank.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		PageRank rhs = (PageRank) other;
+
+		// merge configurations
+
+		maxIterations = Math.max(maxIterations, rhs.maxIterations);
+		convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold);
+		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
+
+		return true;
+	}
+
+	@Override
+	public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
+			throws Exception {
+		// vertex degree
+		DataSet<Vertex<K, Degrees>> vertexDegree = input
+			.run(new VertexDegrees<K, VV, EV>()
+				.setParallelism(parallelism));
+
+		// vertex count
+		DataSet<LongValue> vertexCount = GraphUtils.count(vertexDegree);
+
+		// s, t, d(s)
+		DataSet<Edge<K, LongValue>> edgeSourceDegree = input
+			.run(new EdgeSourceDegrees<K, VV, EV>()
+				.setParallelism(parallelism))
+			.map(new ExtractSourceDegree<K, EV>())
+				.setParallelism(parallelism)
+				.name("Extract source degree");
+
+		// vertices with zero in-edges
+		DataSet<Tuple2<K, DoubleValue>> sourceVertices = vertexDegree
+			.flatMap(new InitializeSourceVertices<K>())
+			.withBroadcastSet(vertexCount, VERTEX_COUNT)
+				.setParallelism(parallelism)
+				.name("Initialize source vertex scores");
+
+		// s, initial pagerank(s)
+		DataSet<Tuple2<K, DoubleValue>> initialScores = vertexDegree
+			.map(new InitializeVertexScores<K>())
+			.withBroadcastSet(vertexCount, VERTEX_COUNT)
+				.setParallelism(parallelism)
+				.name("Initialize scores");
+
+		IterativeDataSet<Tuple2<K, DoubleValue>> iterative = initialScores
+			.iterate(maxIterations);
+
+		// s, projected pagerank(s)
+		DataSet<Tuple2<K, DoubleValue>> vertexScores = iterative
+			.coGroup(edgeSourceDegree)
+			.where(0)
+			.equalTo(0)
+			.with(new SendScore<K>())
+				.setParallelism(parallelism)
+				.name("Send score")
+			.groupBy(0)
+			.reduce(new SumScore<K>())
+				.setCombineHint(CombineHint.HASH)
+				.setParallelism(parallelism)
+				.name("Sum");
+
+		// ignored ID, total pagerank
+		DataSet<Tuple2<K, DoubleValue>> sumOfScores = vertexScores
+			.reduce(new SumVertexScores<K>())
+				.setParallelism(parallelism)
+				.name("Sum");
+
+		// s, adjusted pagerank(s)
+		DataSet<Tuple2<K, DoubleValue>> adjustedScores = vertexScores
+			.union(sourceVertices)
+				.setParallelism(parallelism)
+				.name("Union with source vertices")
+			.map(new AdjustScores<K>(dampingFactor))
+				.withBroadcastSet(sumOfScores, SUM_OF_SCORES)
+				.withBroadcastSet(vertexCount, VERTEX_COUNT)
+					.setParallelism(parallelism)
+					.name("Adjust scores");
+
+		DataSet<Tuple2<K, DoubleValue>> passThrough;
+
+		if (convergenceThreshold < Double.MAX_VALUE) {
+			passThrough = iterative
+				.join(adjustedScores)
+				.where(0)
+				.equalTo(0)
+				.with(new ChangeInScores<K>())
+					.setParallelism(parallelism)
+					.name("Change in scores");
+
+			iterative.registerAggregationConvergenceCriterion(CHANGE_IN_SCORES, new DoubleSumAggregator(), new ScoreConvergence(convergenceThreshold));
+		} else {
+			passThrough = adjustedScores;
+		}
+
+		return iterative
+			.closeWith(passThrough)
+			.map(new TranslateResult<K>())
+				.setParallelism(parallelism)
+				.name("Map result");
+	}
+
+	/**
+	 * Remove the unused original edge value and extract the out-degree.
+	 *
+	 * @param <T> ID type
+	 * @param <ET> edge value type
+	 */
+	@ForwardedFields("0; 1")
+	private static class ExtractSourceDegree<T, ET>
+	implements MapFunction<Edge<T, Tuple2<ET, Degrees>>, Edge<T, LongValue>> {
+		Edge<T, LongValue> output = new Edge<>();
+
+		@Override
+		public Edge<T, LongValue> map(Edge<T, Tuple2<ET, Degrees>> edge)
+				throws Exception {
+			output.f0 = edge.f0;
+			output.f1 = edge.f1;
+			output.f2 = edge.f2.f1.getOutDegree();
+			return output;
+		}
+	}
+
+	/**
+	 * Source vertices have no in-edges so have a projected score of 0.0.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFields("0")
+	private static class InitializeSourceVertices<T>
+	implements FlatMapFunction<Vertex<T, Degrees>, Tuple2<T, DoubleValue>> {
+		private Tuple2<T, DoubleValue> output = new Tuple2<>(null, new DoubleValue(0.0));
+
+		@Override
+		public void flatMap(Vertex<T, Degrees> vertex, Collector<Tuple2<T, DoubleValue>> out)
+				throws Exception {
+			if (vertex.f1.getInDegree().getValue() == 0) {
+				output.f0 = vertex.f0;
+				out.collect(output);
+			}
+		}
+	}
+
+	/**
+	 * PageRank scores sum to 1.0 so initialize each vertex with the inverse of
+	 * the number of vertices.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFields("0")
+	private static class InitializeVertexScores<T>
+	extends RichMapFunction<Vertex<T, Degrees>, Tuple2<T, DoubleValue>> {
+		private Tuple2<T, DoubleValue> output = new Tuple2<>();
+
+		@Override
+		public void open(Configuration parameters)
+				throws Exception {
+			super.open(parameters);
+
+			Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT);
+			output.f1 = new DoubleValue(1.0 / vertexCount.iterator().next().getValue());
+		}
+
+		@Override
+		public Tuple2<T, DoubleValue> map(Vertex<T, Degrees> vertex)
+				throws Exception {
+			output.f0 = vertex.f0;
+			return output;
+		}
+	}
+
+	/**
+	 * The PageRank score for each vertex is divided evenly and projected to
+	 * neighbors on out-edges.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFieldsSecond("1->0")
+	private static class SendScore<T>
+	implements CoGroupFunction<Tuple2<T, DoubleValue>, Edge<T, LongValue>, Tuple2<T, DoubleValue>> {
+		private Tuple2<T, DoubleValue> output = new Tuple2<>(null, new DoubleValue());
+
+		@Override
+		public void coGroup(Iterable<Tuple2<T, DoubleValue>> vertex, Iterable<Edge<T, LongValue>> edges, Collector<Tuple2<T, DoubleValue>> out)
+				throws Exception {
+			Iterator<Edge<T, LongValue>> edgeIterator = edges.iterator();
+
+			if (edgeIterator.hasNext()) {
+				Edge<T, LongValue> edge = edgeIterator.next();
+
+				output.f0 = edge.f1;
+				output.f1.setValue(vertex.iterator().next().f1.getValue() / edge.f2.getValue());
+				out.collect(output);
+
+				while (edgeIterator.hasNext()) {
+					edge = edgeIterator.next();
+					output.f0 = edge.f1;
+					out.collect(output);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Sum the PageRank score over all vertices. The vertex ID must be ignored
+	 * but is retained rather than adding another operator.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFields("0")
+	private static class SumVertexScores<T>
+	implements ReduceFunction<Tuple2<T, DoubleValue>> {
+		@Override
+		public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> first, Tuple2<T, DoubleValue> second)
+				throws Exception {
+			first.f1.setValue(first.f1.getValue() + second.f1.getValue());
+			return first;
+		}
+	}
+
+	/**
+	 * Each iteration the per-vertex scores are adjusted with the damping
+	 * factor. Each score is multiplied by the damping factor then added to the
+	 * probability of a "random hop", which is one minus the damping factor.
+	 *
+	 * This operation also accounts for 'sink' vertices, which have no
+	 * out-edges to project score to. The sink scores are computed by taking
+	 * one minus the sum of vertex scores, which also includes precision error.
+	 * This 'missing' score is evenly distributed across vertices as with the
+	 * random hop.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFields("0")
+	private static class AdjustScores<T>
+	extends RichMapFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>> {
+		private double dampingFactor;
+
+		private long vertexCount;
+
+		private double uniformlyDistributedScore;
+
+		public AdjustScores(double dampingFactor) {
+			this.dampingFactor = dampingFactor;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			Collection<Tuple2<T, DoubleValue>> sumOfScores = getRuntimeContext().getBroadcastVariable(SUM_OF_SCORES);
+			// floating point precision error is also included in sumOfSinks
+			double sumOfSinks = 1 - sumOfScores.iterator().next().f1.getValue();
+
+			Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT);
+			this.vertexCount = vertexCount.iterator().next().getValue();
+
+			this.uniformlyDistributedScore = ((1 - dampingFactor) + dampingFactor * sumOfSinks) / this.vertexCount;
+		}
+
+		@Override
+		public Tuple2<T, DoubleValue> map(Tuple2<T, DoubleValue> value) throws Exception {
+			value.f1.setValue(uniformlyDistributedScore + (dampingFactor * value.f1.getValue()));
+			return value;
+		}
+	}
+
+	/**
+	 * Computes the sum of the absolute change in vertex PageRank scores
+	 * between iterations.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFieldsFirst("0")
+	@ForwardedFieldsSecond("*")
+	private static class ChangeInScores<T>
+	extends RichJoinFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>> {
+		private double changeInScores;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			changeInScores = 0.0;
+		}
+
+		@Override
+		public void close()
+				throws Exception {
+			super.close();
+
+			DoubleSumAggregator agg = getIterationRuntimeContext().getIterationAggregator(CHANGE_IN_SCORES);
+			agg.aggregate(changeInScores);
+		}
+
+		@Override
+		public Tuple2<T, DoubleValue> join(Tuple2<T, DoubleValue> first, Tuple2<T, DoubleValue> second)
+				throws Exception {
+			changeInScores += Math.abs(second.f1.getValue() - first.f1.getValue());
+			return second;
+		}
+	}
+
+	/**
+	 * Monitors the sum of the absolute change in vertex scores. The algorithm
+	 * terminates when the change in scores compared against the prior iteration
+	 * falls to or below the given convergence threshold.
+	 */
+	private static class ScoreConvergence
+	implements ConvergenceCriterion<DoubleValue> {
+		private double convergenceThreshold;
+
+		public ScoreConvergence(double convergenceThreshold) {
+			this.convergenceThreshold = convergenceThreshold;
+		}
+
+		@Override
+		public boolean isConverged(int iteration, DoubleValue value) {
+			double val = value.getValue();
+			return (val <= convergenceThreshold);
+		}
+	}
+
+	/**
+	 * Map the Tuple result to the return type.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFields("0; 1")
+	private static class TranslateResult<T>
+		implements MapFunction<Tuple2<T, DoubleValue>, Result<T>> {
+		private Result<T> output = new Result<>();
+
+		@Override
+		public Result<T> map(Tuple2<T, DoubleValue> value) throws Exception {
+			output.f0 = value.f0;
+			output.f1 = value.f1;
+			return output;
+		}
+	}
+
+	/**
+	 * Wraps the {@link Tuple2} to encapsulate results from the PageRank algorithm.
+	 *
+	 * @param <T> ID type
+	 */
+	public static class Result<T>
+	extends Tuple2<T, DoubleValue> {
+		public static final int HASH_SEED = 0x4010af29;
+
+		private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
+
+		public T getVertexId0() {
+			return f0;
+		}
+
+		/**
+		 * Get the PageRank score.
+		 *
+		 * @return the PageRank score
+		 */
+		public DoubleValue getPageRankScore() {
+			return f1;
+		}
+
+		public String toVerboseString() {
+			return "Vertex ID: " + getVertexId0()
+				+ ", PageRank score: " + getPageRankScore();
+		}
+
+		@Override
+		public int hashCode() {
+			return hasher.reset()
+				.hash(f0.hashCode())
+				.hash(f1.getValue())
+				.hash();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
index b09f95c..e9db838 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/HITSTest.java
@@ -21,8 +21,7 @@ package org.apache.flink.graph.library.link_analysis;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.asm.AsmTestBase;
-import org.apache.flink.graph.asm.dataset.ChecksumHashCode;
-import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
+import org.apache.flink.graph.asm.dataset.Collect;
 import org.apache.flink.graph.library.link_analysis.HITS.Result;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
@@ -30,26 +29,43 @@ import org.apache.flink.types.NullValue;
 import org.junit.Test;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import static org.junit.Assert.assertEquals;
 
 public class HITSTest
 extends AsmTestBase {
 
+	/*
+	 * This test result can be verified with the following Python script.
+
+	import math
+	import networkx as nx
+
+	graph=nx.read_edgelist('directedSimpleGraph.csv', delimiter=',', create_using=nx.DiGraph())
+	hits=nx.algorithms.link_analysis.hits(graph)
+
+	hubbiness_norm=math.sqrt(sum(v*v for v in hits[0].values()))
+	authority_norm=math.sqrt(sum(v*v for v in hits[1].values()))
+
+	for key in sorted(hits[0]):
+		print('{}: {}, {}'.format(key, hits[0][key]/hubbiness_norm, hits[1][key]/authority_norm))
+	 */
 	@Test
 	public void testWithSimpleGraph()
 			throws Exception {
-		DataSet<Result<IntValue>> hits = new HITS<IntValue, NullValue, NullValue>(10)
+		DataSet<Result<IntValue>> hits = new HITS<IntValue, NullValue, NullValue>(20)
 			.run(directedSimpleGraph);
 
 		List<Tuple2<Double, Double>> expectedResults = new ArrayList<>();
-		expectedResults.add(Tuple2.of(0.5446287864731747, 0.0));
-		expectedResults.add(Tuple2.of(0.0, 0.8363240238999012));
-		expectedResults.add(Tuple2.of(0.6072453524686667, 0.26848532437604833));
-		expectedResults.add(Tuple2.of(0.5446287864731747, 0.39546603929699625));
-		expectedResults.add(Tuple2.of(0.0, 0.26848532437604833));
-		expectedResults.add(Tuple2.of(0.194966796646811, 0.0));
+		expectedResults.add(Tuple2.of(0.544643396306, 0.0));
+		expectedResults.add(Tuple2.of(0.0, 0.836329395866));
+		expectedResults.add(Tuple2.of(0.607227031134, 0.268492526138));
+		expectedResults.add(Tuple2.of(0.544643396306, 0.395444899355));
+		expectedResults.add(Tuple2.of(0.0, 0.268492526138));
+		expectedResults.add(Tuple2.of(0.194942233447, 0.0));
 
 		for (Result<IntValue> result : hits.collect()) {
 			int id = result.f0.getValue();
@@ -76,17 +92,53 @@ extends AsmTestBase {
 		}
 	}
 
+	/*
+	 * This test result can be verified with the following Python script.
+
+	import math
+	import networkx as nx
+
+	graph=nx.read_edgelist('directedRMatGraph.csv', delimiter=',', create_using=nx.DiGraph())
+	hits=nx.algorithms.link_analysis.hits(graph)
+
+	hubbiness_norm=math.sqrt(sum(v*v for v in hits[0].values()))
+	authority_norm=math.sqrt(sum(v*v for v in hits[1].values()))
+
+	for key in [0, 1, 2, 8, 13, 29, 109, 394, 652, 1020]:
+		print('{}: {}, {}'.format(key, hits[0][str(key)]/hubbiness_norm, hits[1][str(key)]/authority_norm))
+	 */
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
 		DataSet<Result<LongValue>> hits = directedRMatGraph
-			.run(new HITS<LongValue, NullValue, NullValue>(1));
+			.run(new HITS<LongValue, NullValue, NullValue>(0.000001));
 
-		Checksum checksum = new ChecksumHashCode<Result<LongValue>>()
-			.run(hits)
-			.execute();
+		Map<Long, Result<LongValue>> results = new HashMap<>();
+		for (Result<LongValue> result :  new Collect<Result<LongValue>>().run(hits).execute()) {
+			results.put(result.f0.getValue(), result);
+		}
 
-		assertEquals(902, checksum.getCount());
-		assertEquals(0x000001cbba6dbcd0L, checksum.getChecksum());
+		assertEquals(902, results.size());
+
+		Map<Long, Tuple2<Double, Double>> expectedResults = new HashMap<>();
+		// a pseudo-random selection of results, both high and low
+		expectedResults.put(0L, Tuple2.of(0.231077034747, 0.238110214937));
+		expectedResults.put(1L, Tuple2.of(0.162364053933, 0.169679504287));
+		expectedResults.put(2L, Tuple2.of(0.162412612499, 0.161015667261));
+		expectedResults.put(8L, Tuple2.of(0.167064641724, 0.158592966505));
+		expectedResults.put(13L, Tuple2.of(0.041915595624, 0.0407091625629));
+		expectedResults.put(29L, Tuple2.of(0.0102017346511, 0.0146218045999));
+		expectedResults.put(109L, Tuple2.of(0.00190531000389, 0.00481944993023));
+		expectedResults.put(394L, Tuple2.of(0.0122287016161, 0.0147987969538));
+		expectedResults.put(652L, Tuple2.of(0.010966659242, 0.0113713306749));
+		expectedResults.put(1020L, Tuple2.of(0.0, 0.000326973732127));
+
+		for (Map.Entry<Long, Tuple2<Double, Double>> expected : expectedResults.entrySet()) {
+			double hubScore = results.get(expected.getKey()).getHubScore().getValue();
+			double authorityScore = results.get(expected.getKey()).getAuthorityScore().getValue();
+
+			assertEquals(expected.getValue().f0, hubScore, 0.00001);
+			assertEquals(expected.getValue().f1, authorityScore, 0.00001);
+		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ea14053f/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/PageRankTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/PageRankTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/PageRankTest.java
new file mode 100644
index 0000000..082b6ad
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/link_analysis/PageRankTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.link_analysis;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.dataset.Collect;
+import org.apache.flink.graph.library.link_analysis.PageRank.Result;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.Assert.assertEquals;
+
+public class PageRankTest
+extends AsmTestBase {
+
+	private static final double DAMPING_FACTOR = 0.85;
+
+	/*
+	 * This test result can be verified with the following Python script.
+
+	import networkx as nx
+
+	graph=nx.read_edgelist('directedSimpleGraph.csv', delimiter=',', create_using=nx.DiGraph())
+	pagerank=nx.algorithms.link_analysis.pagerank(graph)
+
+	for key in sorted(pagerank):
+		print('{}: {}'.format(key, pagerank[key]))
+	 */
+	@Test
+	public void testWithSimpleGraph()
+			throws Exception {
+		DataSet<Result<IntValue>> pr = new PageRank<IntValue, NullValue, NullValue>(DAMPING_FACTOR, 10)
+			.run(directedSimpleGraph);
+
+		List<Double> expectedResults = new ArrayList<>();
+		expectedResults.add(0.09091296131286301);
+		expectedResults.add(0.27951855944178117);
+		expectedResults.add(0.12956847924535586);
+		expectedResults.add(0.22329643739217675);
+		expectedResults.add(0.18579060129496028);
+		expectedResults.add(0.09091296131286301);
+
+		for (Tuple2<IntValue, DoubleValue> result : pr.collect()) {
+			int id = result.f0.getValue();
+			assertEquals(expectedResults.get(id), result.f1.getValue(), 0.000001);
+		}
+	}
+
+	@Test
+	public void testWithCompleteGraph()
+			throws Exception {
+		double expectedScore = 1.0 / completeGraphVertexCount;
+
+		DataSet<Result<LongValue>> pr = new PageRank<LongValue, NullValue, NullValue>(DAMPING_FACTOR, 0.000001)
+			.run(completeGraph);
+
+		List<Result<LongValue>> results = pr.collect();
+
+		assertEquals(completeGraphVertexCount, results.size());
+
+		for (Tuple2<LongValue, DoubleValue> result : results) {
+			assertEquals(expectedScore, result.f1.getValue(), 0.000001);
+		}
+	}
+
+	/*
+	 * This test result can be verified with the following Python script.
+
+	import networkx as nx
+
+	graph=nx.read_edgelist('directedRMatGraph.csv', delimiter=',', create_using=nx.DiGraph())
+	pagerank=nx.algorithms.link_analysis.pagerank(graph)
+
+	for key in [0, 1, 2, 8, 13, 29, 109, 394, 652, 1020]:
+		print('{}: {}'.format(key, pagerank[str(key)]))
+	 */
+	@Test
+	public void testWithRMatGraph()
+			throws Exception {
+		DataSet<Result<LongValue>> pr = new PageRank<LongValue, NullValue, NullValue>(DAMPING_FACTOR, 0.000001)
+			.run(directedRMatGraph);
+
+		Map<Long, Result<LongValue>> results = new HashMap<>();
+		for (Result<LongValue> result :  new Collect<Result<LongValue>>().run(pr).execute()) {
+			results.put(result.getVertexId0().getValue(), result);
+		}
+
+		assertEquals(902, results.size());
+
+		Map<Long, Double> expectedResults = new HashMap<>();
+		// a pseudo-random selection of results, both high and low
+		expectedResults.put(0L, 0.027111807822);
+		expectedResults.put(1L, 0.0132842310382);
+		expectedResults.put(2L, 0.0121818392504);
+		expectedResults.put(8L, 0.0115916809743);
+		expectedResults.put(13L, 0.00183249490033);
+		expectedResults.put(29L, 0.000848095047082);
+		expectedResults.put(109L, 0.000308507844048);
+		expectedResults.put(394L, 0.000828743280246);
+		expectedResults.put(652L, 0.000684102931253);
+		expectedResults.put(1020L, 0.000250487135148);
+
+		for (Map.Entry<Long, Double> expected : expectedResults.entrySet()) {
+			double value = results.get(expected.getKey()).getPageRankScore().getValue();
+
+			assertEquals(expected.getValue(), value, 0.00001);
+		}
+	}
+}


[2/2] flink git commit: [FLINK-5597] [docs] Improve the LocalClusteringCoefficient documentation

Posted by gr...@apache.org.
[FLINK-5597] [docs] Improve the LocalClusteringCoefficient documentation

Update the documentation for Gelly's library methods with improved
algorithm descriptions and explanation of algorithm results.

This closes #3404


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cb9e409b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cb9e409b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cb9e409b

Branch: refs/heads/master
Commit: cb9e409b764f95e07441a0c8da6c24e21bc1564b
Parents: ea14053
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Feb 23 16:26:45 2017 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Mar 2 16:42:58 2017 -0500

----------------------------------------------------------------------
 docs/dev/libs/gelly/library_methods.md          | 92 ++++++++++----------
 .../directed/LocalClusteringCoefficient.java    |  4 +-
 .../clustering/directed/TriadicCensus.java      |  2 +-
 .../clustering/directed/TriangleListing.java    |  8 +-
 .../undirected/LocalClusteringCoefficient.java  |  4 +-
 .../clustering/undirected/TriadicCensus.java    |  4 +-
 .../clustering/undirected/TriangleListing.java  | 15 ++--
 .../flink/graph/library/link_analysis/HITS.java |  4 +-
 .../graph/library/similarity/AdamicAdar.java    |  6 +-
 .../graph/library/similarity/JaccardIndex.java  | 10 +--
 .../graph/utils/proxy/OptionalBoolean.java      |  2 +-
 11 files changed, 79 insertions(+), 72 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cb9e409b/docs/dev/libs/gelly/library_methods.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/library_methods.md b/docs/dev/libs/gelly/library_methods.md
index 94eee2e..026bea4 100644
--- a/docs/dev/libs/gelly/library_methods.md
+++ b/docs/dev/libs/gelly/library_methods.md
@@ -166,18 +166,6 @@ The algorithm is implemented using [gather-sum-apply iterations](#gather-sum-app
 
 See the [Single Source Shortest Paths](#single-source-shortest-paths) library method for implementation details and usage information.
 
-## Triangle Count
-
-#### Overview
-An analytic for counting the number of unique triangles in a graph.
-
-#### Details
-Counts the triangles generated by [Triangle Listing](#triangle-listing).
-
-#### Usage
-The analytic takes an undirected graph as input and returns as a result a `Long` corresponding to the number of triangles
-in the graph. The graph ID type must be `Comparable` and `Copyable`.
-
 ## Triangle Enumerator
 
 #### Overview
@@ -229,11 +217,12 @@ neighbors) to 1.0 (complete graph).
 
 #### Details
 See the [Local Clustering Coefficient](#local-clustering-coefficient) library method for a detailed explanation of
-clustering coefficient.
+clustering coefficient. The Average Clustering Coefficient is the average of the Local Clustering Coefficient scores
+over all vertices with at least two neighbors. Each vertex, independent of degree, has equal weight for this score.
 
 #### Usage
-Directed and undirected variants are provided. The algorithm takes a simple graph as input and outputs a result
-containing the total number of vertices and average local clustering coefficient of the graph. The graph ID type must be
+Directed and undirected variants are provided. The analytics take a simple graph as input and output an `AnalyticResult`
+containing the total number of vertices and average clustering coefficient of the graph. The graph ID type must be
 `Comparable` and `Copyable`.
 
 * `setLittleParallelism`: override the parallelism of operators processing small amounts of data
@@ -246,12 +235,14 @@ neighbors) to 1.0 (complete graph).
 
 #### Details
 See the [Local Clustering Coefficient](#local-clustering-coefficient) library method for a detailed explanation of
-clustering coefficient.
+clustering coefficient. The Global Clustering Coefficient is the ratio of connected neighbors over the entire graph.
+Vertices with higher degrees have greater weight for this score because the count of neighbor pairs is quadratic in
+degree.
 
 #### Usage
-Directed and undirected variants are provided. The algorithm takes a simple graph as input and outputs a result
-containing the total number of triplets and triangles in the graph. The graph ID type must be `Comparable` and
-`Copyable`.
+Directed and undirected variants are provided. The analytics take a simple graph as input and output an `AnalyticResult`
+containing the total number of triplets and triangles in the graph. The result class provides a method to compute the
+global clustering coefficient score. The graph ID type must be `Comparable` and `Copyable`.
 
 * `setLittleParallelism`: override the parallelism of operators processing small amounts of data
 
@@ -262,19 +253,20 @@ The local clustering coefficient measures the connectedness of each vertex's nei
 edges between neighbors) to 1.0 (neighborhood is a clique).
 
 #### Details
-An edge between a vertex's neighbors is a triangle. Counting edges between neighbors is equivalent to counting the
+An edge between neighbors of a vertex is a triangle. Counting edges between neighbors is equivalent to counting the
 number of triangles which include the vertex. The clustering coefficient score is the number of edges between neighbors
 divided by the number of potential edges between neighbors.
 
-See the [Triangle Enumerator](#triangle-enumerator) library method for a detailed explanation of triangle enumeration.
+See the [Triangle Listing](#triangle-listing) library method for a detailed explanation of triangle enumeration.
 
 #### Usage
-Directed and undirected variants are provided. The algorithms take a simple graph as input and outputs a `DataSet` of
-tuples containing the vertex ID, vertex degree, and number of triangles containing the vertex. The graph ID type must be
-`Comparable` and `Copyable`.
+Directed and undirected variants are provided. The algorithms take a simple graph as input and output a `DataSet` of
+`UnaryResult` containing the vertex ID, vertex degree, and number of triangles containing the vertex. The result class
+provides a method to compute the local clustering coefficient score. The graph ID type must be `Comparable` and
+`Copyable`.
 
-* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
 * `setIncludeZeroDegreeVertices`: include results for vertices with a degree of zero
+* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
 
 ### Triadic Census
 
@@ -284,29 +276,35 @@ or unconnected. The [Triadic Census](http://vlado.fmf.uni-lj.si/pub/networks/doc
 occurrences of each type of triad with the graph.
 
 #### Details
-The analytics counts the four undirected triad types (formed with 0, 1, 2, or 3 connecting edges) or 16 directed triad
+This analytic counts the four undirected triad types (formed with 0, 1, 2, or 3 connecting edges) or 16 directed triad
 types by counting the triangles from [Triangle Listing](#triangle-listing) and running [Vertex Metrics](#vertex-metrics)
 to obtain the number of triplets and edges. Triangle counts are then deducted from triplet counts, and triangle and
 triplet counts are removed from edge counts.
 
 #### Usage
-Directed and undirected variants are provided. The analytics take a simple graph as input and outputs a `Result` with
-accessor methods for the computed statistics. The graph ID type must be `Comparable` and `Copyable`.
+Directed and undirected variants are provided. The analytics take a simple graph as input and output an
+`AnalyticResult` with accessor methods for querying the count of each triad type. The graph ID type must be
+`Comparable` and `Copyable`.
 
 * `setLittleParallelism`: override the parallelism of operators processing small amounts of data
 
 ### Triangle Listing
 
 #### Overview
-This algorithm supports object reuse. The graph ID type must be `Comparable` and `Copyable`.
+Enumerates all triangles in the graph. A triangle is composed of three edges connecting three vertices into cliques of
+size 3.
 
 #### Details
-See the [Triangle Enumerator](#triangle-enumerator) library method for implementation details.
+Triangles are listed by joining open triplets (two edges with a common neighbor) against edges on the triplet endpoints.
+This implementation uses optimizations from
+[Schank's algorithm](http://i11www.iti.uni-karlsruhe.de/extra/publications/sw-fclt-05_t.pdf) to improve performance with
+high-degree vertices. Triplets are generated from the lowest degree vertex since each triangle need only be listed once.
+This greatly reduces the number of generated triplets which is quadratic in vertex degree.
 
 #### Usage
-Directed and undirected variants are provided. The algorithms take a simple graph as input and outputs a `DataSet` of
-tuples containing the three triangle vertices and, for the directed algorithm, a bitmask marking each of the six
-potential triangle edges. The graph ID type must be `Comparable` and `Copyable`.
+Directed and undirected variants are provided. The algorithms take a simple graph as input and output a `DataSet` of
+`TertiaryResult` containing the three triangle vertices and, for the directed algorithm, a bitmask marking each of the
+six potential edges connecting the three vertices. The graph ID type must be `Comparable` and `Copyable`.
 
 * `setLittleParallelism`: override the parallelism of operators processing small amounts of data
 * `setSortTriangleVertices`: normalize the triangle listing such that for each result (K0, K1, K2) the vertex IDs are sorted K0 < K1 < K2
@@ -324,11 +322,13 @@ good authorities and good authorities are those pointed to by many good hubs.
 Every vertex is assigned the same initial hub and authority scores. The algorithm then iteratively updates the scores
 until termination. During each iteration new hub scores are computed from the authority scores, then new authority
 scores are computed from the new hub scores. The scores are then normalized and optionally tested for convergence.
+HITS is similar to [PageRank](#pagerank) but vertex scores are emitted in full to each neighbor whereas in PageRank
+the vertex score is first divided by the number of neighbors.
 
 #### Usage
-The algorithm takes a directed graph as input and outputs a `DataSet` of `Tuple3` containing the vertex ID, hub score,
-and authority score. Termination is configured with a maximum number of iterations and/or a convergence threshold
-on the sum of the change in each score for each vertex between iterations.
+The algorithm takes a simple directed graph as input and outputs a `DataSet` of `UnaryResult` containing the vertex ID,
+hub score, and authority score. Termination is configured by the number of iterations and/or a convergence threshold on
+the iteration sum of the change in scores over all vertices.
 
 * `setParallelism`: override the operator parallelism
 
@@ -377,8 +377,8 @@ The statistics are computed over vertex degrees generated from `degree.annotate.
 `degree.annotate.undirected.VertexDegree`.
 
 #### Usage
-Directed and undirected variants are provided. The analytics take a simple graph as input and output a `Result` with
-accessor methods for the computed statistics. The graph ID type must be `Comparable`.
+Directed and undirected variants are provided. The analytics take a simple graph as input and output an `AnalyticResult`
+with accessor methods for the computed statistics. The graph ID type must be `Comparable`.
 
 * `setIncludeZeroDegreeVertices`: include results for vertices with a degree of zero
 * `setParallelism`: override the operator parallelism
@@ -398,8 +398,8 @@ The statistics are computed over edge degrees generated from `degree.annotate.di
 `degree.annotate.undirected.EdgeDegreePair` and grouped by vertex.
 
 #### Usage
-Directed and undirected variants are provided. The analytics take a simple graph as input and output a `Result` with
-accessor methods for the computed statistics. The graph ID type must be `Comparable`.
+Directed and undirected variants are provided. The analytics take a simple graph as input and output an `AnalyticResult`
+with accessor methods for the computed statistics. The graph ID type must be `Comparable`.
 
 * `setParallelism`: override the operator parallelism
 * `setReduceOnTargetId` (undirected only): the degree can be counted from either the edge source or target IDs. By default the source IDs are counted. Reducing on target IDs may optimize the algorithm if the input edge list is sorted by target ID
@@ -416,13 +416,13 @@ influential to each pair of neighbors.
 #### Details
 The algorithm first annotates each vertex with the inverse of the logarithm of the vertex degree then joins this score
 onto edges by source vertex. Grouping on the source vertex, each pair of neighbors is emitted with the vertex score.
-Grouping on two-paths, the Adamic-Adar score is summed.
+Grouping on vertex pairs, the Adamic-Adar score is summed.
 
 See the [Jaccard Index](#jaccard-index) library method for a similar algorithm.
 
 #### Usage
-The algorithm takes a simple, undirected graph as input and outputs a `DataSet` of tuples containing two vertex IDs and
-the Adamic-Adair similarity score. The graph ID type must be `Comparable` and `Copyable`.
+The algorithm takes a simple undirected graph as input and outputs a `DataSet` of `BinaryResult` containing two vertex
+IDs and the Adamic-Adar similarity score. The graph ID type must be `Copyable`.
 
 * `setLittleParallelism`: override the parallelism of operators processing small amounts of data
 * `setMinimumRatio`: filter out Adamic-Adar scores less than the given ratio times the average score
@@ -441,12 +441,12 @@ distinct neighbors is computed by storing the sum of degrees of the vertex pair
 neighbors, which are double-counted in the sum of degrees.
 
 The algorithm first annotates each edge with the target vertex's degree. Grouping on the source vertex, each pair of
-neighbors is emitted with the degree sum. Grouping on two-paths, the shared neighbors are counted.
+neighbors is emitted with the degree sum. Grouping on vertex pairs, the shared neighbors are counted.
 
 #### Usage
-The algorithm takes a simple, undirected graph as input and outputs a `DataSet` of tuples containing two vertex IDs,
+The algorithm takes a simple undirected graph as input and outputs a `DataSet` of tuples containing two vertex IDs,
 the number of shared neighbors, and the number of distinct neighbors. The result class provides a method to compute the
-Jaccard Index score. The graph ID type must be `Comparable` and `Copyable`.
+Jaccard Index score. The graph ID type must be `Copyable`.
 
 * `setLittleParallelism`: override the parallelism of operators processing small amounts of data
 * `setMaximumScore`: filter out Jaccard Index scores greater than or equal to the given maximum fraction

http://git-wip-us.apache.org/repos/asf/flink/blob/cb9e409b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index cad36b3..ffd4b13 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -44,11 +44,11 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * The local clustering coefficient measures the connectedness of each vertex's
  * neighborhood. Scores range from 0.0 (no edges between neighbors) to 1.0
  * (neighborhood is a clique).
- * <br/>
+ * <p>
  * An edge between a vertex's neighbors is a triangle. Counting edges between
  * neighbors is equivalent to counting the number of triangles which include
  * the vertex.
- * <br/>
+ * <p>
  * The input graph must be a simple graph containing no duplicate edges or
  * self-loops.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/cb9e409b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
index 6c9e7b6..2274e3e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
@@ -40,7 +40,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 /**
  * A triad is formed by three connected or unconnected vertices in a graph.
  * The triadic census counts the occurrences of each type of triad.
- * <br/>
+ * <p>
  * http://vlado.fmf.uni-lj.si/pub/networks/doc/triads/triads.pdf
  *
  * @param <K> graph ID type

http://git-wip-us.apache.org/repos/asf/flink/blob/cb9e409b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index a1006c4..6b3e2a1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -51,11 +51,15 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Generates a listing of distinct triangles from the input graph.
- * <br/>
+ * <p>
  * A triangle is a 3-clique with vertices A, B, and C connected by edges
  * (A, B), (A, C), and (B, C).
- * <br/>
+ * <p>
  * The input graph must not contain duplicate edges or self-loops.
+ * <p>
+ * This algorithm is similar to the undirected version but also tracks and
+ * computes a bitmask representing the six potential graph edges connecting
+ * the triangle vertices.
  *
  * @param <K> graph ID type
  * @param <VV> vertex value type

http://git-wip-us.apache.org/repos/asf/flink/blob/cb9e409b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 99e3db9..31ddf45 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -44,11 +44,11 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * The local clustering coefficient measures the connectedness of each vertex's
  * neighborhood. Scores range from 0.0 (no edges between neighbors) to 1.0
  * (neighborhood is a clique).
- * <br/>
+ * <p>
  * An edge between a vertex's neighbors is a triangle. Counting edges between
  * neighbors is equivalent to counting the number of triangles which include
  * the vertex.
- * <br/>
+ * <p>
  * The input graph must be a simple, undirected graph containing no duplicate
  * edges or self-loops.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/cb9e409b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
index 3f59d0e..7482af0 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
@@ -38,10 +38,10 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 /**
  * A triad is formed by three connected or unconnected vertices in a graph.
  * The triadic census counts the occurrences of each type of triad.
- * <br/>
+ * <p>
  * The four types of undirected triads are formed with 0, 1, 2, or 3
  * connecting edges.
- * <br/>
+ * <p>
  * http://vlado.fmf.uni-lj.si/pub/networks/doc/triads/triads.pdf
  *
  * @param <K> graph ID type

http://git-wip-us.apache.org/repos/asf/flink/blob/cb9e409b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index 8850125..09b9a5d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -48,15 +48,16 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Generates a listing of distinct triangles from the input graph.
- * <br/>
+ * <p>
  * A triangle is a 3-cycle with vertices A, B, and C connected by edges
  * (A, B), (A, C), and (B, C).
- * <br/>
+ * <p>
  * The input graph must be a simple, undirected graph containing no duplicate
  * edges or self-loops.
- * <br/>
- * Algorithm from "Graph Twiddling in a MapReduce World", J. D. Cohen,
- * http://lintool.github.io/UMD-courses/bigdata-2015-Spring/content/Cohen_2009.pdf
+ * <p>
+ * Algorithm from "Finding, Counting and Listing all Triangles in Large Graphs,
+ * An Experimental Study", Thomas Schank and Dorothea Wagner.
+ * http://i11www.iti.uni-karlsruhe.de/extra/publications/sw-fclt-05_t.pdf
  *
  * @param <K> graph ID type
  * @param <VV> vertex value type
@@ -178,7 +179,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, K, K>> {
 	/**
 	 * Removes edge values while filtering such that only edges where the
 	 * source vertex ID compares less than the target vertex ID are emitted.
-	 * <br/>
+	 * <p>
 	 * Since the input graph is a simple graph this filter removes exactly half
 	 * of the original edges.
 	 *
@@ -206,7 +207,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Tuple3<K, K, K>> {
 	 * vertex has lower degree are emitted. If the source and target vertex
 	 * degrees are equal then the edge is emitted if the source vertex ID
 	 * compares less than the target vertex ID.
-	 * <br/>
+	 * <p>
 	 * Since the input graph is a simple graph this filter removes exactly half
 	 * of the original edges.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/cb9e409b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
index ecc1ad7..f4195f7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
@@ -53,9 +53,11 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * Hyperlink-Induced Topic Search computes two interdependent scores for every
  * vertex in a directed graph. A good "hub" links to good "authorities" and
  * good "authorities" are linked from good "hubs".
- *
+ * <p>
  * This algorithm can be configured to terminate either by a limit on the number
  * of iterations, a convergence threshold, or both.
+ * <p>
+ * http://www.cs.cornell.edu/home/kleinber/auth.pdf
  *
  * http://www.cs.cornell.edu/home/kleinber/auth.pdf
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/cb9e409b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index 00819e4..a7ba00a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -53,16 +53,16 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf
- * <br/>
+ * <p>
  * Adamic-Adar measures the similarity between pairs of vertices as the sum of
  * the inverse logarithm of degree over shared neighbors. Scores are non-negative
  * and unbounded. A vertex with higher degree has greater overall influence but
  * is less influential to each pair of neighbors.
- * <br/>
+ * <p>
  * This implementation produces similarity scores for each pair of vertices
  * in the graph with at least one shared neighbor; equivalently, this is the
  * set of all non-zero Adamic-Adar coefficients.
- * <br/>
+ * <p>
  * The input graph must be a simple, undirected graph containing no duplicate
  * edges or self-loops.
  *

http://git-wip-us.apache.org/repos/asf/flink/blob/cb9e409b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index 148d541..11ec73d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -48,11 +48,11 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * is computed as the number of shared neighbors divided by the number of
  * distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all
  * neighbors are shared).
- * <br/>
+ * <p>
  * This implementation produces similarity scores for each pair of vertices
  * in the graph with at least one shared neighbor; equivalently, this is the
  * set of all non-zero Jaccard Similarity coefficients.
- * <br/>
+ * <p>
  * The input graph must be a simple, undirected graph containing no duplicate
  * edges or self-loops.
  *
@@ -240,10 +240,10 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 * This is the first of three operations implementing a self-join to generate
 	 * the full neighbor pairing for each vertex. The number of neighbor pairs
 	 * is (n choose 2) which is quadratic in the vertex degree.
-	 * <br/>
+	 * <p>
 	 * The third operation, {@link GenerateGroupPairs}, processes groups of size
 	 * {@link #groupSize} and emits {@code O(groupSize * deg(vertex))} pairs.
-	 * <br/>
+	 * <p>
 	 * This input to the third operation is still quadratic in the vertex degree.
 	 * Two prior operations, {@link GenerateGroupSpans} and {@link GenerateGroups},
 	 * each emit datasets linear in the vertex degree, with a forced rebalance
@@ -318,7 +318,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 	/**
 	 * Emits the two-path for all neighbor pairs in this group.
-	 * <br/>
+	 * <p>
 	 * The first {@link #groupSize} vertices are emitted pairwise. Following
 	 * vertices are only paired with vertices from this initial group.
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/cb9e409b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/OptionalBoolean.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/OptionalBoolean.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/OptionalBoolean.java
index 70a1294..7a7208a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/OptionalBoolean.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/OptionalBoolean.java
@@ -22,7 +22,7 @@ import org.apache.flink.graph.GraphAlgorithm;
 
 /**
  * A multi-state boolean.
- * <br/>
+ * <p>
  * This class is used by {@link GraphAlgorithm} configuration options to set a
  * default value which can be overwritten. The default value is also used when
  * algorithm configurations are merged and conflict.