You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/26 19:16:17 UTC

[08/15] flink git commit: [FLINK-6709] [gelly] Activate strict checkstyle for flink-gellies

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
index 5f28605..949dd4c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
@@ -18,12 +18,10 @@
 
 package org.apache.flink.graph.library.clustering.directed;
 
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.flink.api.common.accumulators.LongCounter;
-import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.AnalyticHelper;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalyticBase;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
@@ -32,6 +30,9 @@ import org.apache.flink.graph.library.clustering.directed.TriadicCensus.Result;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
 import java.io.IOException;
 import java.math.BigInteger;
 import java.text.NumberFormat;
@@ -41,15 +42,15 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 /**
  * A triad is formed by three connected or unconnected vertices in a graph.
  * The triadic census counts the occurrences of each type of triad.
- * <p>
- * http://vlado.fmf.uni-lj.si/pub/networks/doc/triads/triads.pdf
+ *
+ * <p>See http://vlado.fmf.uni-lj.si/pub/networks/doc/triads/triads.pdf
  *
  * @param <K> graph ID type
  * @param <VV> vertex value type
  * @param <EV> edge value type
  */
 public class TriadicCensus<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Result> {
+extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private TriangleListingHelper<K> triangleListingHelper;
 
@@ -101,24 +102,24 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		BigInteger three = BigInteger.valueOf(3);
 		BigInteger six = BigInteger.valueOf(6);
 
-		BigInteger vertexCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "vc"));
-		BigInteger unidirectionalEdgeCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "uec") / 2);
-		BigInteger bidirectionalEdgeCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "bec") / 2);
-		BigInteger triplet021dCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "021d"));
-		BigInteger triplet021uCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "021u"));
-		BigInteger triplet021cCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "021c"));
-		BigInteger triplet111dCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "111d"));
-		BigInteger triplet111uCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "111u"));
-		BigInteger triplet201Count = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "201"));
+		BigInteger vertexCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "vc"));
+		BigInteger unidirectionalEdgeCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "uec") / 2);
+		BigInteger bidirectionalEdgeCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "bec") / 2);
+		BigInteger triplet021dCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "021d"));
+		BigInteger triplet021uCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "021u"));
+		BigInteger triplet021cCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "021c"));
+		BigInteger triplet111dCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "111d"));
+		BigInteger triplet111uCount = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "111u"));
+		BigInteger triplet201Count = BigInteger.valueOf((Long) vertexDegreesHelper.getAccumulator(env, "201"));
 
 		// triads with three connecting edges = closed triplet = triangle
-		BigInteger triangle030tCount = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "030t"));
-		BigInteger triangle030cCount = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "030c"));
-		BigInteger triangle120dCount = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "120d"));
-		BigInteger triangle120uCount = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "120u"));
-		BigInteger triangle120cCount = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "120c"));
-		BigInteger triangle210Count = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "210"));
-		BigInteger triangle300Count = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "300"));
+		BigInteger triangle030tCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "030t"));
+		BigInteger triangle030cCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "030c"));
+		BigInteger triangle120dCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "120d"));
+		BigInteger triangle120uCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "120u"));
+		BigInteger triangle120cCount = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "120c"));
+		BigInteger triangle210Count = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "210"));
+		BigInteger triangle300Count = BigInteger.valueOf((Long) triangleListingHelper.getAccumulator(env, "300"));
 
 		// triads with two connecting edges = open triplet;
 		// each triangle deducts the count of three triplets
@@ -236,7 +237,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 			long triangle210Count = 0;
 			long triangle300tCount = 0;
 
-			for (int i = 0 ; i < typeTable.length ; i++) {
+			for (int i = 0; i < typeTable.length; i++) {
 				if (typeTable[i] == 9) {
 					triangle030tCount += triangleCount[i];
 				} else if (typeTable[i] == 10) {
@@ -509,7 +510,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		/**
 		 * Get the array of counts.
 		 *
-		 * The order of the counts is from least to most connected:
+		 * <p>The order of the counts is from least to most connected:
 		 *   003, 012, 102, 021d, 021u, 021c, 111d, 111u,
 		 *   030t, 030c, 201, 120d, 120u, 120c, 210, 300
 		 *
@@ -550,11 +551,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 		@Override
 		public boolean equals(Object obj) {
-			if (obj == null) { return false; }
-			if (obj == this) { return true; }
-			if (obj.getClass() != getClass()) { return false; }
+			if (obj == null) {
+				return false;
+			}
+
+			if (obj == this) {
+				return true;
+			}
+
+			if (obj.getClass() != getClass()) {
+				return false;
+			}
 
-			Result rhs = (Result)obj;
+			Result rhs = (Result) obj;
 
 			return new EqualsBuilder()
 				.append(counts, rhs.counts)

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index 582c4b5..38b0746 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -53,13 +53,13 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Generates a listing of distinct triangles from the input graph.
- * <p>
- * A triangle is a 3-clique with vertices A, B, and C connected by edges
+ *
+ * <p>A triangle is a 3-clique with vertices A, B, and C connected by edges
  * (A, B), (A, C), and (B, C).
- * <p>
- * The input graph must not contain duplicate edges or self-loops.
- * <p>
- * This algorithm is similar to the undirected version but also tracks and
+ *
+ * <p>The input graph must not contain duplicate edges or self-loops.
+ *
+ * <p>This algorithm is similar to the undirected version but also tracks and
  * computes a bitmask representing the six potential graph edges connecting
  * the triangle vertices.
  *
@@ -112,7 +112,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
 		Preconditions.checkNotNull(other);
 
-		if (! TriangleListing.class.isAssignableFrom(other.getClass())) {
+		if (!TriangleListing.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 
@@ -258,9 +258,9 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 */
 	private static final class OrderByDegree<T extends Comparable<T>, ET>
 	implements MapFunction<Edge<T, Tuple3<ET, Degrees, Degrees>>, Tuple3<T, T, ByteValue>> {
-		private ByteValue forward = new ByteValue((byte)(EdgeOrder.FORWARD.getBitmask() << 2));
+		private ByteValue forward = new ByteValue((byte) (EdgeOrder.FORWARD.getBitmask() << 2));
 
-		private ByteValue reverse = new ByteValue((byte)(EdgeOrder.REVERSE.getBitmask() << 2));
+		private ByteValue reverse = new ByteValue((byte) (EdgeOrder.REVERSE.getBitmask() << 2));
 
 		private Tuple3<T, T, ByteValue> output = new Tuple3<>();
 
@@ -319,17 +319,17 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 					Tuple2<T, ByteValue> previous = visited.get(i);
 
 					output.f1 = previous.f0;
-					output.f3.setValue((byte)(previous.f1.getValue() | bitmask));
+					output.f3.setValue((byte) (previous.f1.getValue() | bitmask));
 
 					// u, v, w, bitmask
 					out.collect(output);
 				}
 
-				if (! iter.hasNext()) {
+				if (!iter.hasNext()) {
 					break;
 				}
 
-				byte shiftedBitmask = (byte)(bitmask << 2);
+				byte shiftedBitmask = (byte) (bitmask << 2);
 
 				if (visitedCount == visited.size()) {
 					visited.add(new Tuple2<>(edge.f1.copy(), new ByteValue(shiftedBitmask)));
@@ -361,7 +361,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			output.f0 = triplet.f0;
 			output.f1 = triplet.f1;
 			output.f2 = triplet.f2;
-			output.f3.setValue((byte)(triplet.f3.getValue() | edge.f2.getValue()));
+			output.f3.setValue((byte) (triplet.f3.getValue() | edge.f2.getValue()));
 			return output;
 		}
 	}
@@ -381,26 +381,26 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			if (value.f0.compareTo(value.f1) > 0) {
 				byte bitmask = value.f3.getValue();
 
-				T temp_val = value.f0;
+				T tempVal = value.f0;
 				value.f0 = value.f1;
 
-				if (temp_val.compareTo(value.f2) < 0) {
-					value.f1 = temp_val;
+				if (tempVal.compareTo(value.f2) < 0) {
+					value.f1 = tempVal;
 
 					int f0f1 = ((bitmask & 0b100000) >>> 1) | ((bitmask & 0b010000) << 1);
 					int f0f2 = (bitmask & 0b001100) >>> 2;
 					int f1f2 = (bitmask & 0b000011) << 2;
 
-					value.f3.setValue((byte)(f0f1 | f0f2 | f1f2));
+					value.f3.setValue((byte) (f0f1 | f0f2 | f1f2));
 				} else {
 					value.f1 = value.f2;
-					value.f2 = temp_val;
+					value.f2 = tempVal;
 
 					int f0f1 = (bitmask & 0b000011) << 4;
 					int f0f2 = ((bitmask & 0b100000) >>> 3) | ((bitmask & 0b010000) >>> 1);
 					int f1f2 = ((bitmask & 0b001000) >>> 3) | ((bitmask & 0b000100) >>> 1);
 
-					value.f3.setValue((byte)(f0f1 | f0f2 | f1f2));
+					value.f3.setValue((byte) (f0f1 | f0f2 | f1f2));
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
index e01892b..c2c2ad2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
@@ -18,18 +18,19 @@
 
 package org.apache.flink.graph.library.clustering.undirected;
 
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.flink.api.common.accumulators.DoubleCounter;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.AnalyticHelper;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalyticBase;
 import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient.Result;
 import org.apache.flink.types.CopyableValue;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
 import java.io.IOException;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
@@ -43,7 +44,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class AverageClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Result> {
+extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private static final String VERTEX_COUNT = "vertexCount";
 
@@ -181,11 +182,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 		@Override
 		public boolean equals(Object obj) {
-			if (obj == null) { return false; }
-			if (obj == this) { return true; }
-			if (obj.getClass() != getClass()) { return false; }
+			if (obj == null) {
+				return false;
+			}
+
+			if (obj == this) {
+				return true;
+			}
+
+			if (obj.getClass() != getClass()) {
+				return false;
+			}
 
-			Result rhs = (Result)obj;
+			Result rhs = (Result) obj;
 
 			return new EqualsBuilder()
 				.append(vertexCount, rhs.vertexCount)

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
index 2eac620..5377c1f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
@@ -18,17 +18,18 @@
 
 package org.apache.flink.graph.library.clustering.undirected;
 
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalyticBase;
 import org.apache.flink.graph.asm.dataset.Count;
 import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient.Result;
 import org.apache.flink.graph.library.metric.undirected.VertexMetrics;
 import org.apache.flink.types.CopyableValue;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
@@ -40,7 +41,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class GlobalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Result> {
+extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private Count<TriangleListing.Result<K>> triangleCount;
 
@@ -141,13 +142,13 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		 * number of closed triplets (triangles) divided by the total number of
 		 * triplets.
 		 *
-		 * A score of {@code Double.NaN} is returned for a graph of isolated vertices
+		 * <p>A score of {@code Double.NaN} is returned for a graph of isolated vertices
 		 * for which both the triangle count and number of neighbors are zero.
 		 *
 		 * @return global clustering coefficient score
 		 */
 		public double getGlobalClusteringCoefficientScore() {
-			return (tripletCount == 0) ? Double.NaN : triangleCount / (double)tripletCount;
+			return (tripletCount == 0) ? Double.NaN : triangleCount / (double) tripletCount;
 		}
 
 		@Override
@@ -167,11 +168,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 		@Override
 		public boolean equals(Object obj) {
-			if (obj == null) { return false; }
-			if (obj == this) { return true; }
-			if (obj.getClass() != getClass()) { return false; }
+			if (obj == null) {
+				return false;
+			}
+
+			if (obj == this) {
+				return true;
+			}
+
+			if (obj.getClass() != getClass()) {
+				return false;
+			}
 
-			Result rhs = (Result)obj;
+			Result rhs = (Result) obj;
 
 			return new EqualsBuilder()
 				.append(tripletCount, rhs.tripletCount)

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 10f7aba..e94310f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -34,7 +34,7 @@ import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
 import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.asm.result.UnaryResult;
 import org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient.Result;
-import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.MurmurHash;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.CopyableValue;
@@ -48,12 +48,12 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * The local clustering coefficient measures the connectedness of each vertex's
  * neighborhood. Scores range from 0.0 (no edges between neighbors) to 1.0
  * (neighborhood is a clique).
- * <p>
- * An edge between a vertex's neighbors is a triangle. Counting edges between
+ *
+ * <p>An edge between a vertex's neighbors is a triangle. Counting edges between
  * neighbors is equivalent to counting the number of triangles which include
  * the vertex.
- * <p>
- * The input graph must be a simple, undirected graph containing no duplicate
+ *
+ * <p>The input graph must be a simple, undirected graph containing no duplicate
  * edges or self-loops.
  *
  * @param <K> graph ID type
@@ -107,7 +107,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
 		Preconditions.checkNotNull(other);
 
-		if (! LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
+		if (!LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 
@@ -247,7 +247,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	implements PrintableResult, UnaryResult<T> {
 		private static final int HASH_SEED = 0xc23937c1;
 
-		private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
+		private MurmurHash hasher = new MurmurHash(HASH_SEED);
 
 		@Override
 		public T getVertexId0() {
@@ -283,7 +283,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		 * number of edges between neighbors, equal to the triangle count,
 		 * divided by the number of potential edges between neighbors.
 		 *
-		 * A score of {@code Double.NaN} is returned for a vertex with degree 1
+		 * <p>A score of {@code Double.NaN} is returned for a vertex with degree 1
 		 * for which both the triangle count and number of neighbors are zero.
 		 *
 		 * @return local clustering coefficient score
@@ -292,7 +292,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			long degree = getDegree().getValue();
 			long neighborPairs = degree * (degree - 1) / 2;
 
-			return (neighborPairs == 0) ? Double.NaN : getTriangleCount().getValue() / (double)neighborPairs;
+			return (neighborPairs == 0) ? Double.NaN : getTriangleCount().getValue() / (double) neighborPairs;
 		}
 
 		/**

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
index 604621d..c5a323d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
@@ -18,11 +18,9 @@
 
 package org.apache.flink.graph.library.clustering.undirected;
 
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalyticBase;
 import org.apache.flink.graph.asm.dataset.Count;
 import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.clustering.undirected.TriadicCensus.Result;
@@ -30,6 +28,9 @@ import org.apache.flink.graph.library.metric.undirected.VertexMetrics;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.util.Preconditions;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
 import java.math.BigInteger;
 import java.text.NumberFormat;
 
@@ -38,18 +39,18 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 /**
  * A triad is formed by three connected or unconnected vertices in a graph.
  * The triadic census counts the occurrences of each type of triad.
- * <p>
- * The four types of undirected triads are formed with 0, 1, 2, or 3
+ *
+ * <p>The four types of undirected triads are formed with 0, 1, 2, or 3
  * connecting edges.
- * <p>
- * http://vlado.fmf.uni-lj.si/pub/networks/doc/triads/triads.pdf
+ *
+ * <p>See http://vlado.fmf.uni-lj.si/pub/networks/doc/triads/triads.pdf
  *
  * @param <K> graph ID type
  * @param <VV> vertex value type
  * @param <EV> edge value type
  */
 public class TriadicCensus<K extends Comparable<K> & CopyableValue<K>, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Result> {
+extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private Count<TriangleListing.Result<K>> triangleCount;
 
@@ -203,7 +204,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		/**
 		 * Get the array of counts.
 		 *
-		 * The order of the counts is from least to most connected:
+		 * <p>The order of the counts is from least to most connected:
 		 *   03, 12, 21, 30
 		 *
 		 * @return array of counts
@@ -231,11 +232,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 		@Override
 		public boolean equals(Object obj) {
-			if (obj == null) { return false; }
-			if (obj == this) { return true; }
-			if (obj.getClass() != getClass()) { return false; }
+			if (obj == null) {
+				return false;
+			}
+
+			if (obj == this) {
+				return true;
+			}
+
+			if (obj.getClass() != getClass()) {
+				return false;
+			}
 
-			Result rhs = (Result)obj;
+			Result rhs = (Result) obj;
 
 			return new EqualsBuilder()
 				.append(counts, rhs.counts)

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index ee8dbaf..b281473 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -33,9 +33,9 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair;
-import org.apache.flink.graph.library.clustering.undirected.TriangleListing.Result;
 import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.asm.result.TertiaryResult;
+import org.apache.flink.graph.library.clustering.undirected.TriangleListing.Result;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.CopyableValue;
@@ -51,16 +51,17 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Generates a listing of distinct triangles from the input graph.
- * <p>
- * A triangle is a 3-cycle with vertices A, B, and C connected by edges
+ *
+ * <p>A triangle is a 3-cycle with vertices A, B, and C connected by edges
  * (A, B), (A, C), and (B, C).
- * <p>
- * The input graph must be a simple, undirected graph containing no duplicate
+ *
+ * <p>The input graph must be a simple, undirected graph containing no duplicate
  * edges or self-loops.
- * <p>
- * Algorithm from "Finding, Counting and Listing all Triangles in Large Graphs,
+ *
+ * <p>Algorithm from "Finding, Counting and Listing all Triangles in Large Graphs,
  * An Experimental Study", Thomas Schank and Dorothea Wagner.
- * http://i11www.iti.uni-karlsruhe.de/extra/publications/sw-fclt-05_t.pdf
+ *
+ * <p>See http://i11www.iti.uni-karlsruhe.de/extra/publications/sw-fclt-05_t.pdf
  *
  * @param <K> graph ID type
  * @param <VV> vertex value type
@@ -111,7 +112,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
 		Preconditions.checkNotNull(other);
 
-		if (! TriangleListing.class.isAssignableFrom(other.getClass())) {
+		if (!TriangleListing.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 
@@ -182,8 +183,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	/**
 	 * Removes edge values while filtering such that only edges where the
 	 * source vertex ID compares less than the target vertex ID are emitted.
-	 * <p>
-	 * Since the input graph is a simple graph this filter removes exactly half
+	 *
+	 * <p>Since the input graph is a simple graph this filter removes exactly half
 	 * of the original edges.
 	 *
 	 * @param <T> ID type
@@ -210,8 +211,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 * vertex has lower degree are emitted. If the source and target vertex
 	 * degrees are equal then the edge is emitted if the source vertex ID
 	 * compares less than the target vertex ID.
-	 * <p>
-	 * Since the input graph is a simple graph this filter removes exactly half
+	 *
+	 * <p>Since the input graph is a simple graph this filter removes exactly half
 	 * of the original edges.
 	 *
 	 * @param <T> ID type
@@ -270,7 +271,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 					out.collect(output);
 				}
 
-				if (! iter.hasNext()) {
+				if (!iter.hasNext()) {
 					break;
 				}
 
@@ -319,14 +320,14 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				throws Exception {
 			// by the triangle listing algorithm we know f1 < f2
 			if (value.f0.compareTo(value.f1) > 0) {
-				T temp_val = value.f0;
+				T tempVal = value.f0;
 				value.f0 = value.f1;
 
-				if (temp_val.compareTo(value.f2) <= 0) {
-					value.f1 = temp_val;
+				if (tempVal.compareTo(value.f2) <= 0) {
+					value.f1 = tempVal;
 				} else {
 					value.f1 = value.f2;
-					value.f2 = temp_val;
+					value.f2 = tempVal;
 				}
 			}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java
deleted file mode 100644
index a7d6ef1..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/Functions.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library.link_analysis;
-
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.types.DoubleValue;
-
-class Functions {
-
-	private Functions() {}
-
-	/**
-	 * Sum vertices' scores.
-	 *
-	 * @param <T> ID type
-	 */
-	@ForwardedFields("0")
-	protected static final class SumScore<T>
-		implements ReduceFunction<Tuple2<T, DoubleValue>> {
-		@Override
-		public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> left, Tuple2<T, DoubleValue> right)
-			throws Exception {
-			left.f1.setValue(left.f1.getValue() + right.f1.getValue());
-			return left;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
deleted file mode 100644
index ba1ab21..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/HITS.java
+++ /dev/null
@@ -1,582 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library.link_analysis;
-
-import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
-import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
-import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.asm.result.PrintableResult;
-import org.apache.flink.graph.asm.result.UnaryResult;
-import org.apache.flink.graph.library.link_analysis.Functions.SumScore;
-import org.apache.flink.graph.library.link_analysis.HITS.Result;
-import org.apache.flink.graph.utils.Murmur3_32;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collection;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
-/**
- * Hyperlink-Induced Topic Search computes two interdependent scores for every
- * vertex in a directed graph. A good "hub" links to good "authorities" and
- * good "authorities" are linked from good "hubs".
- * <p>
- * This algorithm can be configured to terminate either by a limit on the number
- * of iterations, a convergence threshold, or both.
- * <p>
- * http://www.cs.cornell.edu/home/kleinber/auth.pdf
- *
- * @param <K> graph ID type
- * @param <VV> vertex value type
- * @param <EV> edge value type
- */
-public class HITS<K, VV, EV>
-extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
-
-	private static final String CHANGE_IN_SCORES = "change in scores";
-
-	private static final String HUBBINESS_SUM_SQUARED = "hubbiness sum squared";
-
-	private static final String AUTHORITY_SUM_SQUARED = "authority sum squared";
-
-	// Required configuration
-	private int maxIterations;
-
-	private double convergenceThreshold;
-
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Hyperlink-Induced Topic Search with a fixed number of iterations.
-	 *
-	 * @param iterations fixed number of iterations
-	 */
-	public HITS(int iterations) {
-		this(iterations, Double.MAX_VALUE);
-	}
-
-	/**
-	 * Hyperlink-Induced Topic Search with a convergence threshold. The algorithm
-	 * terminates when the total change in hub and authority scores over all
-	 * vertices falls to or below the given threshold value.
-	 *
-	 * @param convergenceThreshold convergence threshold for sum of scores
-	 */
-	public HITS(double convergenceThreshold) {
-		this(Integer.MAX_VALUE, convergenceThreshold);
-	}
-
-	/**
-	 * Hyperlink-Induced Topic Search with a convergence threshold and a maximum
-	 * iteration count. The algorithm terminates after either the given number
-	 * of iterations or when the total change in hub and authority scores over all
-	 * vertices falls to or below the given threshold value.
-	 *
-	 * @param maxIterations maximum number of iterations
-	 * @param convergenceThreshold convergence threshold for sum of scores
-	 */
-	public HITS(int maxIterations, double convergenceThreshold) {
-		Preconditions.checkArgument(maxIterations > 0, "Number of iterations must be greater than zero");
-		Preconditions.checkArgument(convergenceThreshold > 0.0, "Convergence threshold must be greater than zero");
-
-		this.maxIterations = maxIterations;
-		this.convergenceThreshold = convergenceThreshold;
-	}
-
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public HITS<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
-	@Override
-	protected String getAlgorithmName() {
-		return HITS.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
-		Preconditions.checkNotNull(other);
-
-		if (! HITS.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
-
-		HITS rhs = (HITS) other;
-
-		// merge configurations
-
-		maxIterations = Math.max(maxIterations, rhs.maxIterations);
-		convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
-	}
-
-	@Override
-	public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
-			throws Exception {
-		DataSet<Tuple2<K, K>> edges = input
-			.getEdges()
-			.map(new ExtractEdgeIDs<K, EV>())
-				.setParallelism(parallelism)
-				.name("Extract edge IDs");
-
-		// ID, hub, authority
-		DataSet<Tuple3<K, DoubleValue, DoubleValue>> initialScores = edges
-			.map(new InitializeScores<K>())
-				.setParallelism(parallelism)
-				.name("Initial scores")
-			.groupBy(0)
-			.reduce(new SumScores<K>())
-			.setCombineHint(CombineHint.HASH)
-				.setParallelism(parallelism)
-				.name("Sum");
-
-		IterativeDataSet<Tuple3<K, DoubleValue, DoubleValue>> iterative = initialScores
-			.iterate(maxIterations);
-
-		// ID, hubbiness
-		DataSet<Tuple2<K, DoubleValue>> hubbiness = iterative
-			.coGroup(edges)
-			.where(0)
-			.equalTo(1)
-			.with(new Hubbiness<K>())
-				.setParallelism(parallelism)
-				.name("Hub")
-			.groupBy(0)
-			.reduce(new SumScore<K>())
-			.setCombineHint(CombineHint.HASH)
-				.setParallelism(parallelism)
-				.name("Sum");
-
-		// sum-of-hubbiness-squared
-		DataSet<DoubleValue> hubbinessSumSquared = hubbiness
-			.map(new Square<K>())
-				.setParallelism(parallelism)
-				.name("Square")
-			.reduce(new Sum())
-			.setCombineHint(CombineHint.HASH)
-				.setParallelism(parallelism)
-				.name("Sum");
-
-		// ID, new authority
-		DataSet<Tuple2<K, DoubleValue>> authority = hubbiness
-			.coGroup(edges)
-			.where(0)
-			.equalTo(0)
-			.with(new Authority<K>())
-				.setParallelism(parallelism)
-				.name("Authority")
-			.groupBy(0)
-			.reduce(new SumScore<K>())
-			.setCombineHint(CombineHint.HASH)
-				.setParallelism(parallelism)
-				.name("Sum");
-
-		// sum-of-authority-squared
-		DataSet<DoubleValue> authoritySumSquared = authority
-			.map(new Square<K>())
-				.setParallelism(parallelism)
-				.name("Square")
-			.reduce(new Sum())
-			.setCombineHint(CombineHint.HASH)
-				.setParallelism(parallelism)
-				.name("Sum");
-
-		// ID, normalized hubbiness, normalized authority
-		DataSet<Tuple3<K, DoubleValue, DoubleValue>> scores = hubbiness
-			.fullOuterJoin(authority, JoinHint.REPARTITION_SORT_MERGE)
-			.where(0)
-			.equalTo(0)
-			.with(new JoinAndNormalizeHubAndAuthority<K>())
-			.withBroadcastSet(hubbinessSumSquared, HUBBINESS_SUM_SQUARED)
-			.withBroadcastSet(authoritySumSquared, AUTHORITY_SUM_SQUARED)
-				.setParallelism(parallelism)
-				.name("Join scores");
-
-		DataSet<Tuple3<K, DoubleValue, DoubleValue>> passThrough;
-
-		if (convergenceThreshold < Double.MAX_VALUE) {
-			passThrough = iterative
-				.fullOuterJoin(scores, JoinHint.REPARTITION_SORT_MERGE)
-				.where(0)
-				.equalTo(0)
-				.with(new ChangeInScores<K>())
-					.setParallelism(parallelism)
-					.name("Change in scores");
-
-			iterative.registerAggregationConvergenceCriterion(CHANGE_IN_SCORES, new DoubleSumAggregator(), new ScoreConvergence(convergenceThreshold));
-		} else {
-			passThrough = scores;
-		}
-
-		return iterative
-			.closeWith(passThrough)
-			.map(new TranslateResult<K>())
-				.setParallelism(parallelism)
-				.name("Map result");
-	}
-
-	/**
-	 * Map edges and remove the edge value.
-	 *
-	 * @param <T> ID type
-	 * @param <ET> edge value type
-	 *
-	 * @see Graph.ExtractEdgeIDsMapper
-	 */
-	@ForwardedFields("0; 1")
-	private static class ExtractEdgeIDs<T, ET>
-	implements MapFunction<Edge<T, ET>, Tuple2<T, T>> {
-		private Tuple2<T, T> output = new Tuple2<>();
-
-		@Override
-		public Tuple2<T, T> map(Edge<T, ET> value)
-				throws Exception {
-			output.f0 = value.f0;
-			output.f1 = value.f1;
-			return output;
-		}
-	}
-
-	/**
-	 * Initialize vertices' authority scores by assigning each vertex with an
-	 * initial hub score of 1.0. The hub scores are initialized to zero since
-	 * these will be computed based on the initial authority scores.
-	 *
-	 * The initial scores are non-normalized.
-	 *
-	 * @param <T> ID type
-	 */
-	@ForwardedFields("1->0")
-	private static class InitializeScores<T>
-	implements MapFunction<Tuple2<T, T>, Tuple3<T, DoubleValue, DoubleValue>> {
-		private Tuple3<T, DoubleValue, DoubleValue> output = new Tuple3<>(null, new DoubleValue(0.0), new DoubleValue(1.0));
-
-		@Override
-		public Tuple3<T, DoubleValue, DoubleValue> map(Tuple2<T, T> value) throws Exception {
-			output.f0 = value.f1;
-			return output;
-		}
-	}
-
-	/**
-	 * Sum vertices' hub and authority scores.
-	 *
-	 * @param <T> ID type
-	 */
-	@ForwardedFields("0")
-	private static class SumScores<T>
-	implements ReduceFunction<Tuple3<T, DoubleValue, DoubleValue>> {
-		@Override
-		public Tuple3<T, DoubleValue, DoubleValue> reduce(Tuple3<T, DoubleValue, DoubleValue> left, Tuple3<T, DoubleValue, DoubleValue> right)
-				throws Exception {
-			left.f1.setValue(left.f1.getValue() + right.f1.getValue());
-			left.f2.setValue(left.f2.getValue() + right.f2.getValue());
-			return left;
-		}
-	}
-
-	/**
-	 * The hub score is the sum of authority scores of vertices on out-edges.
-	 *
-	 * @param <T> ID type
-	 */
-	@ForwardedFieldsFirst("2->1")
-	@ForwardedFieldsSecond("0")
-	private static class Hubbiness<T>
-	implements CoGroupFunction<Tuple3<T, DoubleValue, DoubleValue>, Tuple2<T, T>, Tuple2<T, DoubleValue>> {
-		private Tuple2<T, DoubleValue> output = new Tuple2<>();
-
-		@Override
-		public void coGroup(Iterable<Tuple3<T, DoubleValue, DoubleValue>> vertex, Iterable<Tuple2<T, T>> edges, Collector<Tuple2<T, DoubleValue>> out)
-				throws Exception {
-			output.f1 = vertex.iterator().next().f2;
-
-			for (Tuple2<T, T> edge : edges) {
-				output.f0 = edge.f0;
-				out.collect(output);
-			}
-		}
-	}
-
-	/**
-	 * The authority score is the sum of hub scores of vertices on in-edges.
-	 *
-	 * @param <T> ID type
-	 */
-	@ForwardedFieldsFirst("1")
-	@ForwardedFieldsSecond("1->0")
-	private static class Authority<T>
-	implements CoGroupFunction<Tuple2<T, DoubleValue>, Tuple2<T, T>, Tuple2<T, DoubleValue>> {
-		private Tuple2<T, DoubleValue> output = new Tuple2<>();
-
-		@Override
-		public void coGroup(Iterable<Tuple2<T, DoubleValue>> vertex, Iterable<Tuple2<T, T>> edges, Collector<Tuple2<T, DoubleValue>> out)
-				throws Exception {
-			output.f1 = vertex.iterator().next().f1;
-
-			for (Tuple2<T, T> edge : edges) {
-				output.f0 = edge.f1;
-				out.collect(output);
-			}
-		}
-	}
-
-	/**
-	 * Compute the square of each score.
-	 *
-	 * @param <T> ID type
-	 */
-	private static class Square<T>
-	implements MapFunction<Tuple2<T, DoubleValue>, DoubleValue> {
-		private DoubleValue output = new DoubleValue();
-
-		@Override
-		public DoubleValue map(Tuple2<T, DoubleValue> value)
-				throws Exception {
-			double val = value.f1.getValue();
-			output.setValue(val * val);
-
-			return output;
-		}
-	}
-
-	/**
-	 * Sum over values. This specialized function is used in place of generic aggregation.
-	 */
-	private static class Sum
-	implements ReduceFunction<DoubleValue> {
-		@Override
-		public DoubleValue reduce(DoubleValue first, DoubleValue second)
-				throws Exception {
-			first.setValue(first.getValue() + second.getValue());
-			return first;
-		}
-	}
-
-	/**
-	 * Join and normalize the hub and authority scores.
-	 *
-	 * @param <T> ID type
-	 */
-	@ForwardedFieldsFirst("0")
-	@ForwardedFieldsSecond("0")
-	private static class JoinAndNormalizeHubAndAuthority<T>
-	extends RichJoinFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>, Tuple3<T, DoubleValue, DoubleValue>> {
-		private Tuple3<T, DoubleValue, DoubleValue> output = new Tuple3<>(null, new DoubleValue(), new DoubleValue());
-
-		private double hubbinessRootSumSquared;
-
-		private double authorityRootSumSquared;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-
-			Collection<DoubleValue> var;
-			var = getRuntimeContext().getBroadcastVariable(HUBBINESS_SUM_SQUARED);
-			hubbinessRootSumSquared = Math.sqrt(var.iterator().next().getValue());
-
-			var = getRuntimeContext().getBroadcastVariable(AUTHORITY_SUM_SQUARED);
-			authorityRootSumSquared = Math.sqrt(var.iterator().next().getValue());
-		}
-
-		@Override
-		public Tuple3<T, DoubleValue, DoubleValue> join(Tuple2<T, DoubleValue> hubbiness, Tuple2<T, DoubleValue> authority)
-				throws Exception {
-			output.f0 = (authority == null) ? hubbiness.f0 : authority.f0;
-			output.f1.setValue(hubbiness == null ? 0.0 : hubbiness.f1.getValue() / hubbinessRootSumSquared);
-			output.f2.setValue(authority == null ? 0.0 : authority.f1.getValue() / authorityRootSumSquared);
-			return output;
-		}
-	}
-
-	/**
-	 * Computes the total sum of the change in hub and authority scores over
-	 * all vertices between iterations. A negative score is emitted after the
-	 * first iteration to prevent premature convergence.
-	 *
-	 * @param <T> ID type
-	 */
-	@ForwardedFieldsFirst("0")
-	@ForwardedFieldsSecond("*")
-	private static class ChangeInScores<T>
-	extends RichJoinFunction<Tuple3<T, DoubleValue, DoubleValue>, Tuple3<T, DoubleValue, DoubleValue>, Tuple3<T, DoubleValue, DoubleValue>> {
-		private boolean isInitialSuperstep;
-
-		private double changeInScores;
-
-		@Override
-		public void open(Configuration parameters)
-				throws Exception {
-			super.open(parameters);
-
-			isInitialSuperstep = (getIterationRuntimeContext().getSuperstepNumber() == 1);
-			changeInScores = (isInitialSuperstep) ? -1.0 : 0.0;
-		}
-
-		@Override
-		public void close()
-				throws Exception {
-			super.close();
-
-			DoubleSumAggregator agg = getIterationRuntimeContext().getIterationAggregator(CHANGE_IN_SCORES);
-			agg.aggregate(changeInScores);
-		}
-
-		@Override
-		public Tuple3<T, DoubleValue, DoubleValue> join(Tuple3<T, DoubleValue, DoubleValue> first, Tuple3<T, DoubleValue, DoubleValue> second)
-				throws Exception {
-			if (! isInitialSuperstep) {
-				changeInScores += Math.abs(second.f1.getValue() - first.f1.getValue());
-				changeInScores += Math.abs(second.f2.getValue() - first.f2.getValue());
-			}
-
-			return second;
-		}
-	}
-
-	/**
-	 * Monitors the total change in hub and authority scores over all vertices.
-	 * The algorithm terminates when the change in scores compared against the
-	 * prior iteration falls to or below the given convergence threshold.
-	 *
-	 * An optimization of this implementation of HITS is to leave the initial
-	 * scores non-normalized; therefore, the change in scores after the first
-	 * superstep cannot be measured and a negative value is emitted to signal
-	 * that the iteration should continue.
-	 */
-	private static class ScoreConvergence
-	implements ConvergenceCriterion<DoubleValue> {
-		private double convergenceThreshold;
-
-		public ScoreConvergence(double convergenceThreshold) {
-			this.convergenceThreshold = convergenceThreshold;
-		}
-
-		@Override
-		public boolean isConverged(int iteration, DoubleValue value) {
-			double val = value.getValue();
-			return (0 <= val && val <= convergenceThreshold);
-		}
-	}
-
-	/**
-	 * Map the Tuple result to the return type.
-	 *
-	 * @param <T> ID type
-	 */
-	@ForwardedFields("0; 1; 2")
-	private static class TranslateResult<T>
-	implements MapFunction<Tuple3<T, DoubleValue, DoubleValue>, Result<T>> {
-		private Result<T> output = new Result<>();
-
-		@Override
-		public Result<T> map(Tuple3<T, DoubleValue, DoubleValue> value) throws Exception {
-			output.f0 = value.f0;
-			output.f1 = value.f1;
-			output.f2 = value.f2;
-			return output;
-		}
-	}
-
-	/**
-	 * Wraps the {@link Tuple3} to encapsulate results from the HITS algorithm.
-	 *
-	 * @param <T> ID type
-	 */
-	public static class Result<T>
-	extends Tuple3<T, DoubleValue, DoubleValue>
-	implements PrintableResult, UnaryResult<T> {
-		public static final int HASH_SEED = 0xc7e39a63;
-
-		private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
-
-		@Override
-		public T getVertexId0() {
-			return f0;
-		}
-
-		@Override
-		public void setVertexId0(T value) {
-			f0 = value;
-		}
-
-		/**
-		 * Get the hub score. Good hubs link to good authorities.
-		 *
-		 * @return the hub score
-		 */
-		public DoubleValue getHubScore() {
-			return f1;
-		}
-
-		/**
-		 * Get the authority score. Good authorities link to good hubs.
-		 *
-		 * @return the authority score
-		 */
-		public DoubleValue getAuthorityScore() {
-			return f2;
-		}
-
-		public String toPrintableString() {
-			return "Vertex ID: " + getVertexId0()
-				+ ", hub score: " + getHubScore()
-				+ ", authority score: " + getAuthorityScore();
-		}
-
-		@Override
-		public int hashCode() {
-			return hasher.reset()
-				.hash(f0.hashCode())
-				.hash(f1.getValue())
-				.hash(f2.getValue())
-				.hash();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
deleted file mode 100644
index 747735e..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/link_analysis/PageRank.java
+++ /dev/null
@@ -1,544 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library.link_analysis;
-
-import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
-import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.functions.RichJoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
-import org.apache.flink.api.java.operators.IterativeDataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.asm.degree.annotate.directed.EdgeSourceDegrees;
-import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
-import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
-import org.apache.flink.graph.asm.result.PrintableResult;
-import org.apache.flink.graph.asm.result.UnaryResult;
-import org.apache.flink.graph.library.link_analysis.Functions.SumScore;
-import org.apache.flink.graph.library.link_analysis.PageRank.Result;
-import org.apache.flink.graph.utils.GraphUtils;
-import org.apache.flink.graph.utils.Murmur3_32;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
-import org.apache.flink.types.DoubleValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
-
-import java.util.Collection;
-import java.util.Iterator;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
-/**
- * PageRank computes a per-vertex score which is the sum of PageRank scores
- * transmitted over in-edges. Each vertex's score is divided evenly among
- * out-edges. High-scoring vertices are linked to by other high-scoring
- * vertices; this is similar to the 'authority' score in {@link HITS}.
- *
- * http://ilpubs.stanford.edu:8090/422/1/1999-66.pdf
- *
- * @param <K> graph ID type
- * @param <VV> vertex value type
- * @param <EV> edge value type
- */
-public class PageRank<K, VV, EV>
-extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
-
-	private static final String VERTEX_COUNT = "vertex count";
-
-	private static final String SUM_OF_SCORES = "sum of scores";
-
-	private static final String CHANGE_IN_SCORES = "change in scores";
-
-	// Required configuration
-	private final double dampingFactor;
-
-	private int maxIterations;
-
-	private double convergenceThreshold;
-
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * PageRank with a fixed number of iterations.
-	 *
-	 * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex
-	 * @param iterations fixed number of iterations
-	 */
-	public PageRank(double dampingFactor, int iterations) {
-		this(dampingFactor, iterations, Double.MAX_VALUE);
-	}
-
-	/**
-	 * PageRank with a convergence threshold. The algorithm terminates when the
-	 * change in score over all vertices falls to or below the given threshold value.
-	 *
-	 * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex
-	 * @param convergenceThreshold convergence threshold for sum of scores
-	 */
-	public PageRank(double dampingFactor, double convergenceThreshold) {
-		this(dampingFactor, Integer.MAX_VALUE, convergenceThreshold);
-	}
-
-	/**
-	 * PageRank with a convergence threshold and a maximum iteration count. The
-	 * algorithm terminates after either the given number of iterations or when
-	 * the change in score over all vertices falls to or below the given
-	 * threshold value.
-	 *
-	 * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex
-	 * @param maxIterations maximum number of iterations
-	 * @param convergenceThreshold convergence threshold for sum of scores
-	 */
-	public PageRank(double dampingFactor, int maxIterations, double convergenceThreshold) {
-		Preconditions.checkArgument(0 < dampingFactor && dampingFactor < 1,
-			"Damping factor must be between zero and one");
-		Preconditions.checkArgument(maxIterations > 0, "Number of iterations must be greater than zero");
-		Preconditions.checkArgument(convergenceThreshold > 0.0, "Convergence threshold must be greater than zero");
-
-		this.dampingFactor = dampingFactor;
-		this.maxIterations = maxIterations;
-		this.convergenceThreshold = convergenceThreshold;
-	}
-
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public PageRank<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
-	@Override
-	protected String getAlgorithmName() {
-		return PageRank.class.getName();
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
-		Preconditions.checkNotNull(other);
-
-		if (! PageRank.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
-
-		PageRank rhs = (PageRank) other;
-
-		// merge configurations
-
-		maxIterations = Math.max(maxIterations, rhs.maxIterations);
-		convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
-	}
-
-	@Override
-	public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
-			throws Exception {
-		// vertex degree
-		DataSet<Vertex<K, Degrees>> vertexDegree = input
-			.run(new VertexDegrees<K, VV, EV>()
-				.setParallelism(parallelism));
-
-		// vertex count
-		DataSet<LongValue> vertexCount = GraphUtils.count(vertexDegree);
-
-		// s, t, d(s)
-		DataSet<Edge<K, LongValue>> edgeSourceDegree = input
-			.run(new EdgeSourceDegrees<K, VV, EV>()
-				.setParallelism(parallelism))
-			.map(new ExtractSourceDegree<K, EV>())
-				.setParallelism(parallelism)
-				.name("Extract source degree");
-
-		// vertices with zero in-edges
-		DataSet<Tuple2<K, DoubleValue>> sourceVertices = vertexDegree
-			.flatMap(new InitializeSourceVertices<K>())
-			.withBroadcastSet(vertexCount, VERTEX_COUNT)
-				.setParallelism(parallelism)
-				.name("Initialize source vertex scores");
-
-		// s, initial pagerank(s)
-		DataSet<Tuple2<K, DoubleValue>> initialScores = vertexDegree
-			.map(new InitializeVertexScores<K>())
-			.withBroadcastSet(vertexCount, VERTEX_COUNT)
-				.setParallelism(parallelism)
-				.name("Initialize scores");
-
-		IterativeDataSet<Tuple2<K, DoubleValue>> iterative = initialScores
-			.iterate(maxIterations);
-
-		// s, projected pagerank(s)
-		DataSet<Tuple2<K, DoubleValue>> vertexScores = iterative
-			.coGroup(edgeSourceDegree)
-			.where(0)
-			.equalTo(0)
-			.with(new SendScore<K>())
-				.setParallelism(parallelism)
-				.name("Send score")
-			.groupBy(0)
-			.reduce(new SumScore<K>())
-			.setCombineHint(CombineHint.HASH)
-				.setParallelism(parallelism)
-				.name("Sum");
-
-		// ignored ID, total pagerank
-		DataSet<Tuple2<K, DoubleValue>> sumOfScores = vertexScores
-			.reduce(new SumVertexScores<K>())
-				.setParallelism(parallelism)
-				.name("Sum");
-
-		// s, adjusted pagerank(s)
-		DataSet<Tuple2<K, DoubleValue>> adjustedScores = vertexScores
-			.union(sourceVertices)
-				.setParallelism(parallelism)
-				.name("Union with source vertices")
-			.map(new AdjustScores<K>(dampingFactor))
-				.withBroadcastSet(sumOfScores, SUM_OF_SCORES)
-				.withBroadcastSet(vertexCount, VERTEX_COUNT)
-					.setParallelism(parallelism)
-					.name("Adjust scores");
-
-		DataSet<Tuple2<K, DoubleValue>> passThrough;
-
-		if (convergenceThreshold < Double.MAX_VALUE) {
-			passThrough = iterative
-				.join(adjustedScores)
-				.where(0)
-				.equalTo(0)
-				.with(new ChangeInScores<K>())
-					.setParallelism(parallelism)
-					.name("Change in scores");
-
-			iterative.registerAggregationConvergenceCriterion(CHANGE_IN_SCORES, new DoubleSumAggregator(), new ScoreConvergence(convergenceThreshold));
-		} else {
-			passThrough = adjustedScores;
-		}
-
-		return iterative
-			.closeWith(passThrough)
-			.map(new TranslateResult<K>())
-				.setParallelism(parallelism)
-				.name("Map result");
-	}
-
-	/**
-	 * Remove the unused original edge value and extract the out-degree.
-	 *
-	 * @param <T> ID type
-	 * @param <ET> edge value type
-	 */
-	@ForwardedFields("0; 1")
-	private static class ExtractSourceDegree<T, ET>
-	implements MapFunction<Edge<T, Tuple2<ET, Degrees>>, Edge<T, LongValue>> {
-		Edge<T, LongValue> output = new Edge<>();
-
-		@Override
-		public Edge<T, LongValue> map(Edge<T, Tuple2<ET, Degrees>> edge)
-				throws Exception {
-			output.f0 = edge.f0;
-			output.f1 = edge.f1;
-			output.f2 = edge.f2.f1.getOutDegree();
-			return output;
-		}
-	}
-
-	/**
-	 * Source vertices have no in-edges so have a projected score of 0.0.
-	 *
-	 * @param <T> ID type
-	 */
-	@ForwardedFields("0")
-	private static class InitializeSourceVertices<T>
-	implements FlatMapFunction<Vertex<T, Degrees>, Tuple2<T, DoubleValue>> {
-		private Tuple2<T, DoubleValue> output = new Tuple2<>(null, new DoubleValue(0.0));
-
-		@Override
-		public void flatMap(Vertex<T, Degrees> vertex, Collector<Tuple2<T, DoubleValue>> out)
-				throws Exception {
-			if (vertex.f1.getInDegree().getValue() == 0) {
-				output.f0 = vertex.f0;
-				out.collect(output);
-			}
-		}
-	}
-
-	/**
-	 * PageRank scores sum to 1.0 so initialize each vertex with the inverse of
-	 * the number of vertices.
-	 *
-	 * @param <T> ID type
-	 */
-	@ForwardedFields("0")
-	private static class InitializeVertexScores<T>
-	extends RichMapFunction<Vertex<T, Degrees>, Tuple2<T, DoubleValue>> {
-		private Tuple2<T, DoubleValue> output = new Tuple2<>();
-
-		@Override
-		public void open(Configuration parameters)
-				throws Exception {
-			super.open(parameters);
-
-			Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT);
-			output.f1 = new DoubleValue(1.0 / vertexCount.iterator().next().getValue());
-		}
-
-		@Override
-		public Tuple2<T, DoubleValue> map(Vertex<T, Degrees> vertex)
-				throws Exception {
-			output.f0 = vertex.f0;
-			return output;
-		}
-	}
-
-	/**
-	 * The PageRank score for each vertex is divided evenly and projected to
-	 * neighbors on out-edges.
-	 *
-	 * @param <T> ID type
-	 */
-	@ForwardedFieldsSecond("1->0")
-	private static class SendScore<T>
-	implements CoGroupFunction<Tuple2<T, DoubleValue>, Edge<T, LongValue>, Tuple2<T, DoubleValue>> {
-		private Tuple2<T, DoubleValue> output = new Tuple2<>(null, new DoubleValue());
-
-		@Override
-		public void coGroup(Iterable<Tuple2<T, DoubleValue>> vertex, Iterable<Edge<T, LongValue>> edges, Collector<Tuple2<T, DoubleValue>> out)
-				throws Exception {
-			Iterator<Edge<T, LongValue>> edgeIterator = edges.iterator();
-
-			if (edgeIterator.hasNext()) {
-				Edge<T, LongValue> edge = edgeIterator.next();
-
-				output.f0 = edge.f1;
-				output.f1.setValue(vertex.iterator().next().f1.getValue() / edge.f2.getValue());
-				out.collect(output);
-
-				while (edgeIterator.hasNext()) {
-					edge = edgeIterator.next();
-					output.f0 = edge.f1;
-					out.collect(output);
-				}
-			}
-		}
-	}
-
-	/**
-	 * Sum the PageRank score over all vertices. The vertex ID must be ignored
-	 * but is retained rather than adding another operator.
-	 *
-	 * @param <T> ID type
-	 */
-	@ForwardedFields("0")
-	private static class SumVertexScores<T>
-	implements ReduceFunction<Tuple2<T, DoubleValue>> {
-		@Override
-		public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> first, Tuple2<T, DoubleValue> second)
-				throws Exception {
-			first.f1.setValue(first.f1.getValue() + second.f1.getValue());
-			return first;
-		}
-	}
-
-	/**
-	 * Each iteration the per-vertex scores are adjusted with the damping
-	 * factor. Each score is multiplied by the damping factor then added to the
-	 * probability of a "random hop", which is one minus the damping factor.
-	 *
-	 * This operation also accounts for 'sink' vertices, which have no
-	 * out-edges to project score to. The sink scores are computed by taking
-	 * one minus the sum of vertex scores, which also includes precision error.
-	 * This 'missing' score is evenly distributed across vertices as with the
-	 * random hop.
-	 *
-	 * @param <T> ID type
-	 */
-	@ForwardedFields("0")
-	private static class AdjustScores<T>
-	extends RichMapFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>> {
-		private double dampingFactor;
-
-		private long vertexCount;
-
-		private double uniformlyDistributedScore;
-
-		public AdjustScores(double dampingFactor) {
-			this.dampingFactor = dampingFactor;
-		}
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-
-			Collection<Tuple2<T, DoubleValue>> sumOfScores = getRuntimeContext().getBroadcastVariable(SUM_OF_SCORES);
-			// floating point precision error is also included in sumOfSinks
-			double sumOfSinks = 1 - sumOfScores.iterator().next().f1.getValue();
-
-			Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT);
-			this.vertexCount = vertexCount.iterator().next().getValue();
-
-			this.uniformlyDistributedScore = ((1 - dampingFactor) + dampingFactor * sumOfSinks) / this.vertexCount;
-		}
-
-		@Override
-		public Tuple2<T, DoubleValue> map(Tuple2<T, DoubleValue> value) throws Exception {
-			value.f1.setValue(uniformlyDistributedScore + (dampingFactor * value.f1.getValue()));
-			return value;
-		}
-	}
-
-	/**
-	 * Computes the sum of the absolute change in vertex PageRank scores
-	 * between iterations.
-	 *
-	 * @param <T> ID type
-	 */
-	@ForwardedFieldsFirst("0")
-	@ForwardedFieldsSecond("*")
-	private static class ChangeInScores<T>
-	extends RichJoinFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>> {
-		private double changeInScores;
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			super.open(parameters);
-
-			changeInScores = 0.0;
-		}
-
-		@Override
-		public void close()
-				throws Exception {
-			super.close();
-
-			DoubleSumAggregator agg = getIterationRuntimeContext().getIterationAggregator(CHANGE_IN_SCORES);
-			agg.aggregate(changeInScores);
-		}
-
-		@Override
-		public Tuple2<T, DoubleValue> join(Tuple2<T, DoubleValue> first, Tuple2<T, DoubleValue> second)
-				throws Exception {
-			changeInScores += Math.abs(second.f1.getValue() - first.f1.getValue());
-			return second;
-		}
-	}
-
-	/**
-	 * Monitors the sum of the absolute change in vertex scores. The algorithm
-	 * terminates when the change in scores compared against the prior iteration
-	 * falls to or below the given convergence threshold.
-	 */
-	private static class ScoreConvergence
-	implements ConvergenceCriterion<DoubleValue> {
-		private double convergenceThreshold;
-
-		public ScoreConvergence(double convergenceThreshold) {
-			this.convergenceThreshold = convergenceThreshold;
-		}
-
-		@Override
-		public boolean isConverged(int iteration, DoubleValue value) {
-			double val = value.getValue();
-			return (val <= convergenceThreshold);
-		}
-	}
-
-	/**
-	 * Map the Tuple result to the return type.
-	 *
-	 * @param <T> ID type
-	 */
-	@ForwardedFields("0; 1")
-	private static class TranslateResult<T>
-		implements MapFunction<Tuple2<T, DoubleValue>, Result<T>> {
-		private Result<T> output = new Result<>();
-
-		@Override
-		public Result<T> map(Tuple2<T, DoubleValue> value) throws Exception {
-			output.f0 = value.f0;
-			output.f1 = value.f1;
-			return output;
-		}
-	}
-
-	/**
-	 * Wraps the {@link Tuple2} to encapsulate results from the PageRank algorithm.
-	 *
-	 * @param <T> ID type
-	 */
-	public static class Result<T>
-	extends Tuple2<T, DoubleValue>
-	implements PrintableResult, UnaryResult<T> {
-		public static final int HASH_SEED = 0x4010af29;
-
-		private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
-
-		@Override
-		public T getVertexId0() {
-			return f0;
-		}
-
-		@Override
-		public void setVertexId0(T value) {
-			f0 = value;
-		}
-
-		/**
-		 * Get the PageRank score.
-		 *
-		 * @return the PageRank score
-		 */
-		public DoubleValue getPageRankScore() {
-			return f1;
-		}
-
-		@Override
-		public String toPrintableString() {
-			return "Vertex ID: " + getVertexId0()
-				+ ", PageRank score: " + getPageRankScore();
-		}
-
-		@Override
-		public int hashCode() {
-			return hasher.reset()
-				.hash(f0.hashCode())
-				.hash(f1.getValue())
-				.hash();
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/Functions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/Functions.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/Functions.java
new file mode 100644
index 0000000..0fdd46a
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/Functions.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.linkanalysis;
+
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.types.DoubleValue;
+
+class Functions {
+
+	private Functions() {}
+
+	/**
+	 * Sum vertices' scores.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFields("0")
+	protected static final class SumScore<T>
+		implements ReduceFunction<Tuple2<T, DoubleValue>> {
+		@Override
+		public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> left, Tuple2<T, DoubleValue> right)
+			throws Exception {
+			left.f1.setValue(left.f1.getValue() + right.f1.getValue());
+			return left;
+		}
+	}
+}