You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/26 19:16:16 UTC

[07/15] flink git commit: [FLINK-6709] [gelly] Activate strict checkstyle for flink-gellies

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
new file mode 100644
index 0000000..e8422ac
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
@@ -0,0 +1,582 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.linkanalysis;
+
+import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
+import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.asm.result.UnaryResult;
+import org.apache.flink.graph.library.linkanalysis.Functions.SumScore;
+import org.apache.flink.graph.library.linkanalysis.HITS.Result;
+import org.apache.flink.graph.utils.MurmurHash;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Hyperlink-Induced Topic Search computes two interdependent scores for every
+ * vertex in a directed graph. A good "hub" links to good "authorities" and
+ * good "authorities" are linked from good "hubs".
+ *
+ * <p>This algorithm can be configured to terminate either by a limit on the number
+ * of iterations, a convergence threshold, or both.
+ *
+ * <p>See http://www.cs.cornell.edu/home/kleinber/auth.pdf
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class HITS<K, VV, EV>
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
+
+	private static final String CHANGE_IN_SCORES = "change in scores";
+
+	private static final String HUBBINESS_SUM_SQUARED = "hubbiness sum squared";
+
+	private static final String AUTHORITY_SUM_SQUARED = "authority sum squared";
+
+	// Required configuration
+	private int maxIterations;
+
+	private double convergenceThreshold;
+
+	// Optional configuration
+	private int parallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Hyperlink-Induced Topic Search with a fixed number of iterations.
+	 *
+	 * @param iterations fixed number of iterations
+	 */
+	public HITS(int iterations) {
+		this(iterations, Double.MAX_VALUE);
+	}
+
+	/**
+	 * Hyperlink-Induced Topic Search with a convergence threshold. The algorithm
+	 * terminates when the total change in hub and authority scores over all
+	 * vertices falls to or below the given threshold value.
+	 *
+	 * @param convergenceThreshold convergence threshold for sum of scores
+	 */
+	public HITS(double convergenceThreshold) {
+		this(Integer.MAX_VALUE, convergenceThreshold);
+	}
+
+	/**
+	 * Hyperlink-Induced Topic Search with a convergence threshold and a maximum
+	 * iteration count. The algorithm terminates after either the given number
+	 * of iterations or when the total change in hub and authority scores over all
+	 * vertices falls to or below the given threshold value.
+	 *
+	 * @param maxIterations maximum number of iterations
+	 * @param convergenceThreshold convergence threshold for sum of scores
+	 */
+	public HITS(int maxIterations, double convergenceThreshold) {
+		Preconditions.checkArgument(maxIterations > 0, "Number of iterations must be greater than zero");
+		Preconditions.checkArgument(convergenceThreshold > 0.0, "Convergence threshold must be greater than zero");
+
+		this.maxIterations = maxIterations;
+		this.convergenceThreshold = convergenceThreshold;
+	}
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public HITS<K, VV, EV> setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	@Override
+	protected String getAlgorithmName() {
+		return HITS.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (!HITS.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		HITS rhs = (HITS) other;
+
+		// merge configurations
+
+		maxIterations = Math.max(maxIterations, rhs.maxIterations);
+		convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold);
+		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
+
+		return true;
+	}
+
+	@Override
+	public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
+			throws Exception {
+		DataSet<Tuple2<K, K>> edges = input
+			.getEdges()
+			.map(new ExtractEdgeIDs<K, EV>())
+				.setParallelism(parallelism)
+				.name("Extract edge IDs");
+
+		// ID, hub, authority
+		DataSet<Tuple3<K, DoubleValue, DoubleValue>> initialScores = edges
+			.map(new InitializeScores<K>())
+				.setParallelism(parallelism)
+				.name("Initial scores")
+			.groupBy(0)
+			.reduce(new SumScores<K>())
+			.setCombineHint(CombineHint.HASH)
+				.setParallelism(parallelism)
+				.name("Sum");
+
+		IterativeDataSet<Tuple3<K, DoubleValue, DoubleValue>> iterative = initialScores
+			.iterate(maxIterations);
+
+		// ID, hubbiness
+		DataSet<Tuple2<K, DoubleValue>> hubbiness = iterative
+			.coGroup(edges)
+			.where(0)
+			.equalTo(1)
+			.with(new Hubbiness<K>())
+				.setParallelism(parallelism)
+				.name("Hub")
+			.groupBy(0)
+			.reduce(new SumScore<K>())
+			.setCombineHint(CombineHint.HASH)
+				.setParallelism(parallelism)
+				.name("Sum");
+
+		// sum-of-hubbiness-squared
+		DataSet<DoubleValue> hubbinessSumSquared = hubbiness
+			.map(new Square<K>())
+				.setParallelism(parallelism)
+				.name("Square")
+			.reduce(new Sum())
+			.setCombineHint(CombineHint.HASH)
+				.setParallelism(parallelism)
+				.name("Sum");
+
+		// ID, new authority
+		DataSet<Tuple2<K, DoubleValue>> authority = hubbiness
+			.coGroup(edges)
+			.where(0)
+			.equalTo(0)
+			.with(new Authority<K>())
+				.setParallelism(parallelism)
+				.name("Authority")
+			.groupBy(0)
+			.reduce(new SumScore<K>())
+			.setCombineHint(CombineHint.HASH)
+				.setParallelism(parallelism)
+				.name("Sum");
+
+		// sum-of-authority-squared
+		DataSet<DoubleValue> authoritySumSquared = authority
+			.map(new Square<K>())
+				.setParallelism(parallelism)
+				.name("Square")
+			.reduce(new Sum())
+			.setCombineHint(CombineHint.HASH)
+				.setParallelism(parallelism)
+				.name("Sum");
+
+		// ID, normalized hubbiness, normalized authority
+		DataSet<Tuple3<K, DoubleValue, DoubleValue>> scores = hubbiness
+			.fullOuterJoin(authority, JoinHint.REPARTITION_SORT_MERGE)
+			.where(0)
+			.equalTo(0)
+			.with(new JoinAndNormalizeHubAndAuthority<K>())
+			.withBroadcastSet(hubbinessSumSquared, HUBBINESS_SUM_SQUARED)
+			.withBroadcastSet(authoritySumSquared, AUTHORITY_SUM_SQUARED)
+				.setParallelism(parallelism)
+				.name("Join scores");
+
+		DataSet<Tuple3<K, DoubleValue, DoubleValue>> passThrough;
+
+		if (convergenceThreshold < Double.MAX_VALUE) {
+			passThrough = iterative
+				.fullOuterJoin(scores, JoinHint.REPARTITION_SORT_MERGE)
+				.where(0)
+				.equalTo(0)
+				.with(new ChangeInScores<K>())
+					.setParallelism(parallelism)
+					.name("Change in scores");
+
+			iterative.registerAggregationConvergenceCriterion(CHANGE_IN_SCORES, new DoubleSumAggregator(), new ScoreConvergence(convergenceThreshold));
+		} else {
+			passThrough = scores;
+		}
+
+		return iterative
+			.closeWith(passThrough)
+			.map(new TranslateResult<K>())
+				.setParallelism(parallelism)
+				.name("Map result");
+	}
+
+	/**
+	 * Map edges and remove the edge value.
+	 *
+	 * @param <T> ID type
+	 * @param <ET> edge value type
+	 *
+	 * @see Graph.ExtractEdgeIDsMapper
+	 */
+	@ForwardedFields("0; 1")
+	private static class ExtractEdgeIDs<T, ET>
+	implements MapFunction<Edge<T, ET>, Tuple2<T, T>> {
+		private Tuple2<T, T> output = new Tuple2<>();
+
+		@Override
+		public Tuple2<T, T> map(Edge<T, ET> value)
+				throws Exception {
+			output.f0 = value.f0;
+			output.f1 = value.f1;
+			return output;
+		}
+	}
+
+	/**
+	 * Initialize vertices' authority scores by assigning each vertex with an
+	 * initial hub score of 1.0. The hub scores are initialized to zero since
+	 * these will be computed based on the initial authority scores.
+	 *
+	 * <p>The initial scores are non-normalized.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFields("1->0")
+	private static class InitializeScores<T>
+	implements MapFunction<Tuple2<T, T>, Tuple3<T, DoubleValue, DoubleValue>> {
+		private Tuple3<T, DoubleValue, DoubleValue> output = new Tuple3<>(null, new DoubleValue(0.0), new DoubleValue(1.0));
+
+		@Override
+		public Tuple3<T, DoubleValue, DoubleValue> map(Tuple2<T, T> value) throws Exception {
+			output.f0 = value.f1;
+			return output;
+		}
+	}
+
+	/**
+	 * Sum vertices' hub and authority scores.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFields("0")
+	private static class SumScores<T>
+	implements ReduceFunction<Tuple3<T, DoubleValue, DoubleValue>> {
+		@Override
+		public Tuple3<T, DoubleValue, DoubleValue> reduce(Tuple3<T, DoubleValue, DoubleValue> left, Tuple3<T, DoubleValue, DoubleValue> right)
+				throws Exception {
+			left.f1.setValue(left.f1.getValue() + right.f1.getValue());
+			left.f2.setValue(left.f2.getValue() + right.f2.getValue());
+			return left;
+		}
+	}
+
+	/**
+	 * The hub score is the sum of authority scores of vertices on out-edges.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFieldsFirst("2->1")
+	@ForwardedFieldsSecond("0")
+	private static class Hubbiness<T>
+	implements CoGroupFunction<Tuple3<T, DoubleValue, DoubleValue>, Tuple2<T, T>, Tuple2<T, DoubleValue>> {
+		private Tuple2<T, DoubleValue> output = new Tuple2<>();
+
+		@Override
+		public void coGroup(Iterable<Tuple3<T, DoubleValue, DoubleValue>> vertex, Iterable<Tuple2<T, T>> edges, Collector<Tuple2<T, DoubleValue>> out)
+				throws Exception {
+			output.f1 = vertex.iterator().next().f2;
+
+			for (Tuple2<T, T> edge : edges) {
+				output.f0 = edge.f0;
+				out.collect(output);
+			}
+		}
+	}
+
+	/**
+	 * The authority score is the sum of hub scores of vertices on in-edges.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFieldsFirst("1")
+	@ForwardedFieldsSecond("1->0")
+	private static class Authority<T>
+	implements CoGroupFunction<Tuple2<T, DoubleValue>, Tuple2<T, T>, Tuple2<T, DoubleValue>> {
+		private Tuple2<T, DoubleValue> output = new Tuple2<>();
+
+		@Override
+		public void coGroup(Iterable<Tuple2<T, DoubleValue>> vertex, Iterable<Tuple2<T, T>> edges, Collector<Tuple2<T, DoubleValue>> out)
+				throws Exception {
+			output.f1 = vertex.iterator().next().f1;
+
+			for (Tuple2<T, T> edge : edges) {
+				output.f0 = edge.f1;
+				out.collect(output);
+			}
+		}
+	}
+
+	/**
+	 * Compute the square of each score.
+	 *
+	 * @param <T> ID type
+	 */
+	private static class Square<T>
+	implements MapFunction<Tuple2<T, DoubleValue>, DoubleValue> {
+		private DoubleValue output = new DoubleValue();
+
+		@Override
+		public DoubleValue map(Tuple2<T, DoubleValue> value)
+				throws Exception {
+			double val = value.f1.getValue();
+			output.setValue(val * val);
+
+			return output;
+		}
+	}
+
+	/**
+	 * Sum over values. This specialized function is used in place of generic aggregation.
+	 */
+	private static class Sum
+	implements ReduceFunction<DoubleValue> {
+		@Override
+		public DoubleValue reduce(DoubleValue first, DoubleValue second)
+				throws Exception {
+			first.setValue(first.getValue() + second.getValue());
+			return first;
+		}
+	}
+
+	/**
+	 * Join and normalize the hub and authority scores.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFieldsFirst("0")
+	@ForwardedFieldsSecond("0")
+	private static class JoinAndNormalizeHubAndAuthority<T>
+	extends RichJoinFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>, Tuple3<T, DoubleValue, DoubleValue>> {
+		private Tuple3<T, DoubleValue, DoubleValue> output = new Tuple3<>(null, new DoubleValue(), new DoubleValue());
+
+		private double hubbinessRootSumSquared;
+
+		private double authorityRootSumSquared;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			Collection<DoubleValue> var;
+			var = getRuntimeContext().getBroadcastVariable(HUBBINESS_SUM_SQUARED);
+			hubbinessRootSumSquared = Math.sqrt(var.iterator().next().getValue());
+
+			var = getRuntimeContext().getBroadcastVariable(AUTHORITY_SUM_SQUARED);
+			authorityRootSumSquared = Math.sqrt(var.iterator().next().getValue());
+		}
+
+		@Override
+		public Tuple3<T, DoubleValue, DoubleValue> join(Tuple2<T, DoubleValue> hubbiness, Tuple2<T, DoubleValue> authority)
+				throws Exception {
+			output.f0 = (authority == null) ? hubbiness.f0 : authority.f0;
+			output.f1.setValue(hubbiness == null ? 0.0 : hubbiness.f1.getValue() / hubbinessRootSumSquared);
+			output.f2.setValue(authority == null ? 0.0 : authority.f1.getValue() / authorityRootSumSquared);
+			return output;
+		}
+	}
+
+	/**
+	 * Computes the total sum of the change in hub and authority scores over
+	 * all vertices between iterations. A negative score is emitted after the
+	 * first iteration to prevent premature convergence.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFieldsFirst("0")
+	@ForwardedFieldsSecond("*")
+	private static class ChangeInScores<T>
+	extends RichJoinFunction<Tuple3<T, DoubleValue, DoubleValue>, Tuple3<T, DoubleValue, DoubleValue>, Tuple3<T, DoubleValue, DoubleValue>> {
+		private boolean isInitialSuperstep;
+
+		private double changeInScores;
+
+		@Override
+		public void open(Configuration parameters)
+				throws Exception {
+			super.open(parameters);
+
+			isInitialSuperstep = (getIterationRuntimeContext().getSuperstepNumber() == 1);
+			changeInScores = (isInitialSuperstep) ? -1.0 : 0.0;
+		}
+
+		@Override
+		public void close()
+				throws Exception {
+			super.close();
+
+			DoubleSumAggregator agg = getIterationRuntimeContext().getIterationAggregator(CHANGE_IN_SCORES);
+			agg.aggregate(changeInScores);
+		}
+
+		@Override
+		public Tuple3<T, DoubleValue, DoubleValue> join(Tuple3<T, DoubleValue, DoubleValue> first, Tuple3<T, DoubleValue, DoubleValue> second)
+				throws Exception {
+			if (!isInitialSuperstep) {
+				changeInScores += Math.abs(second.f1.getValue() - first.f1.getValue());
+				changeInScores += Math.abs(second.f2.getValue() - first.f2.getValue());
+			}
+
+			return second;
+		}
+	}
+
+	/**
+	 * Monitors the total change in hub and authority scores over all vertices.
+	 * The algorithm terminates when the change in scores compared against the
+	 * prior iteration falls to or below the given convergence threshold.
+	 *
+	 * <p>An optimization of this implementation of HITS is to leave the initial
+	 * scores non-normalized; therefore, the change in scores after the first
+	 * superstep cannot be measured and a negative value is emitted to signal
+	 * that the iteration should continue.
+	 */
+	private static class ScoreConvergence
+	implements ConvergenceCriterion<DoubleValue> {
+		private double convergenceThreshold;
+
+		public ScoreConvergence(double convergenceThreshold) {
+			this.convergenceThreshold = convergenceThreshold;
+		}
+
+		@Override
+		public boolean isConverged(int iteration, DoubleValue value) {
+			double val = value.getValue();
+			return (0 <= val && val <= convergenceThreshold);
+		}
+	}
+
+	/**
+	 * Map the Tuple result to the return type.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFields("0; 1; 2")
+	private static class TranslateResult<T>
+	implements MapFunction<Tuple3<T, DoubleValue, DoubleValue>, Result<T>> {
+		private Result<T> output = new Result<>();
+
+		@Override
+		public Result<T> map(Tuple3<T, DoubleValue, DoubleValue> value) throws Exception {
+			output.f0 = value.f0;
+			output.f1 = value.f1;
+			output.f2 = value.f2;
+			return output;
+		}
+	}
+
+	/**
+	 * Wraps the {@link Tuple3} to encapsulate results from the HITS algorithm.
+	 *
+	 * @param <T> ID type
+	 */
+	public static class Result<T>
+	extends Tuple3<T, DoubleValue, DoubleValue>
+	implements PrintableResult, UnaryResult<T> {
+		public static final int HASH_SEED = 0xc7e39a63;
+
+		private MurmurHash hasher = new MurmurHash(HASH_SEED);
+
+		@Override
+		public T getVertexId0() {
+			return f0;
+		}
+
+		@Override
+		public void setVertexId0(T value) {
+			f0 = value;
+		}
+
+		/**
+		 * Get the hub score. Good hubs link to good authorities.
+		 *
+		 * @return the hub score
+		 */
+		public DoubleValue getHubScore() {
+			return f1;
+		}
+
+		/**
+		 * Get the authority score. Good authorities link to good hubs.
+		 *
+		 * @return the authority score
+		 */
+		public DoubleValue getAuthorityScore() {
+			return f2;
+		}
+
+		public String toPrintableString() {
+			return "Vertex ID: " + getVertexId0()
+				+ ", hub score: " + getHubScore()
+				+ ", authority score: " + getAuthorityScore();
+		}
+
+		@Override
+		public int hashCode() {
+			return hasher.reset()
+				.hash(f0.hashCode())
+				.hash(f1.getValue())
+				.hash(f2.getValue())
+				.hash();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
new file mode 100644
index 0000000..ecd5f39
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
@@ -0,0 +1,544 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.linkanalysis;
+
+import org.apache.flink.api.common.aggregators.ConvergenceCriterion;
+import org.apache.flink.api.common.aggregators.DoubleSumAggregator;
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.RichJoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.operators.IterativeDataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.directed.EdgeSourceDegrees;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.asm.result.PrintableResult;
+import org.apache.flink.graph.asm.result.UnaryResult;
+import org.apache.flink.graph.library.linkanalysis.Functions.SumScore;
+import org.apache.flink.graph.library.linkanalysis.PageRank.Result;
+import org.apache.flink.graph.utils.GraphUtils;
+import org.apache.flink.graph.utils.MurmurHash;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
+import org.apache.flink.types.DoubleValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Iterator;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * PageRank computes a per-vertex score which is the sum of PageRank scores
+ * transmitted over in-edges. Each vertex's score is divided evenly among
+ * out-edges. High-scoring vertices are linked to by other high-scoring
+ * vertices; this is similar to the 'authority' score in {@link HITS}.
+ *
+ * <p>See http://ilpubs.stanford.edu:8090/422/1/1999-66.pdf
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class PageRank<K, VV, EV>
+extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
+
+	private static final String VERTEX_COUNT = "vertex count";
+
+	private static final String SUM_OF_SCORES = "sum of scores";
+
+	private static final String CHANGE_IN_SCORES = "change in scores";
+
+	// Required configuration
+	private final double dampingFactor;
+
+	private int maxIterations;
+
+	private double convergenceThreshold;
+
+	// Optional configuration
+	private int parallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * PageRank with a fixed number of iterations.
+	 *
+	 * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex
+	 * @param iterations fixed number of iterations
+	 */
+	public PageRank(double dampingFactor, int iterations) {
+		this(dampingFactor, iterations, Double.MAX_VALUE);
+	}
+
+	/**
+	 * PageRank with a convergence threshold. The algorithm terminates when the
+	 * change in score over all vertices falls to or below the given threshold value.
+	 *
+	 * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex
+	 * @param convergenceThreshold convergence threshold for sum of scores
+	 */
+	public PageRank(double dampingFactor, double convergenceThreshold) {
+		this(dampingFactor, Integer.MAX_VALUE, convergenceThreshold);
+	}
+
+	/**
+	 * PageRank with a convergence threshold and a maximum iteration count. The
+	 * algorithm terminates after either the given number of iterations or when
+	 * the change in score over all vertices falls to or below the given
+	 * threshold value.
+	 *
+	 * @param dampingFactor probability of following an out-link, otherwise jump to a random vertex
+	 * @param maxIterations maximum number of iterations
+	 * @param convergenceThreshold convergence threshold for sum of scores
+	 */
+	public PageRank(double dampingFactor, int maxIterations, double convergenceThreshold) {
+		Preconditions.checkArgument(0 < dampingFactor && dampingFactor < 1,
+			"Damping factor must be between zero and one");
+		Preconditions.checkArgument(maxIterations > 0, "Number of iterations must be greater than zero");
+		Preconditions.checkArgument(convergenceThreshold > 0.0, "Convergence threshold must be greater than zero");
+
+		this.dampingFactor = dampingFactor;
+		this.maxIterations = maxIterations;
+		this.convergenceThreshold = convergenceThreshold;
+	}
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public PageRank<K, VV, EV> setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	@Override
+	protected String getAlgorithmName() {
+		return PageRank.class.getName();
+	}
+
+	@Override
+	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
+		Preconditions.checkNotNull(other);
+
+		if (!PageRank.class.isAssignableFrom(other.getClass())) {
+			return false;
+		}
+
+		PageRank rhs = (PageRank) other;
+
+		// merge configurations
+
+		maxIterations = Math.max(maxIterations, rhs.maxIterations);
+		convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold);
+		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
+			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
+
+		return true;
+	}
+
+	@Override
+	public DataSet<Result<K>> runInternal(Graph<K, VV, EV> input)
+			throws Exception {
+		// vertex degree
+		DataSet<Vertex<K, Degrees>> vertexDegree = input
+			.run(new VertexDegrees<K, VV, EV>()
+				.setParallelism(parallelism));
+
+		// vertex count
+		DataSet<LongValue> vertexCount = GraphUtils.count(vertexDegree);
+
+		// s, t, d(s)
+		DataSet<Edge<K, LongValue>> edgeSourceDegree = input
+			.run(new EdgeSourceDegrees<K, VV, EV>()
+				.setParallelism(parallelism))
+			.map(new ExtractSourceDegree<K, EV>())
+				.setParallelism(parallelism)
+				.name("Extract source degree");
+
+		// vertices with zero in-edges
+		DataSet<Tuple2<K, DoubleValue>> sourceVertices = vertexDegree
+			.flatMap(new InitializeSourceVertices<K>())
+			.withBroadcastSet(vertexCount, VERTEX_COUNT)
+				.setParallelism(parallelism)
+				.name("Initialize source vertex scores");
+
+		// s, initial pagerank(s)
+		DataSet<Tuple2<K, DoubleValue>> initialScores = vertexDegree
+			.map(new InitializeVertexScores<K>())
+			.withBroadcastSet(vertexCount, VERTEX_COUNT)
+				.setParallelism(parallelism)
+				.name("Initialize scores");
+
+		IterativeDataSet<Tuple2<K, DoubleValue>> iterative = initialScores
+			.iterate(maxIterations);
+
+		// s, projected pagerank(s)
+		DataSet<Tuple2<K, DoubleValue>> vertexScores = iterative
+			.coGroup(edgeSourceDegree)
+			.where(0)
+			.equalTo(0)
+			.with(new SendScore<K>())
+				.setParallelism(parallelism)
+				.name("Send score")
+			.groupBy(0)
+			.reduce(new SumScore<K>())
+			.setCombineHint(CombineHint.HASH)
+				.setParallelism(parallelism)
+				.name("Sum");
+
+		// ignored ID, total pagerank
+		DataSet<Tuple2<K, DoubleValue>> sumOfScores = vertexScores
+			.reduce(new SumVertexScores<K>())
+				.setParallelism(parallelism)
+				.name("Sum");
+
+		// s, adjusted pagerank(s)
+		DataSet<Tuple2<K, DoubleValue>> adjustedScores = vertexScores
+			.union(sourceVertices)
+				.setParallelism(parallelism)
+				.name("Union with source vertices")
+			.map(new AdjustScores<K>(dampingFactor))
+				.withBroadcastSet(sumOfScores, SUM_OF_SCORES)
+				.withBroadcastSet(vertexCount, VERTEX_COUNT)
+					.setParallelism(parallelism)
+					.name("Adjust scores");
+
+		DataSet<Tuple2<K, DoubleValue>> passThrough;
+
+		if (convergenceThreshold < Double.MAX_VALUE) {
+			passThrough = iterative
+				.join(adjustedScores)
+				.where(0)
+				.equalTo(0)
+				.with(new ChangeInScores<K>())
+					.setParallelism(parallelism)
+					.name("Change in scores");
+
+			iterative.registerAggregationConvergenceCriterion(CHANGE_IN_SCORES, new DoubleSumAggregator(), new ScoreConvergence(convergenceThreshold));
+		} else {
+			passThrough = adjustedScores;
+		}
+
+		return iterative
+			.closeWith(passThrough)
+			.map(new TranslateResult<K>())
+				.setParallelism(parallelism)
+				.name("Map result");
+	}
+
+	/**
+	 * Remove the unused original edge value and extract the out-degree.
+	 *
+	 * @param <T> ID type
+	 * @param <ET> edge value type
+	 */
+	@ForwardedFields("0; 1")
+	private static class ExtractSourceDegree<T, ET>
+	implements MapFunction<Edge<T, Tuple2<ET, Degrees>>, Edge<T, LongValue>> {
+		Edge<T, LongValue> output = new Edge<>();
+
+		@Override
+		public Edge<T, LongValue> map(Edge<T, Tuple2<ET, Degrees>> edge)
+				throws Exception {
+			output.f0 = edge.f0;
+			output.f1 = edge.f1;
+			output.f2 = edge.f2.f1.getOutDegree();
+			return output;
+		}
+	}
+
+	/**
+	 * Source vertices have no in-edges so have a projected score of 0.0.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFields("0")
+	private static class InitializeSourceVertices<T>
+	implements FlatMapFunction<Vertex<T, Degrees>, Tuple2<T, DoubleValue>> {
+		private Tuple2<T, DoubleValue> output = new Tuple2<>(null, new DoubleValue(0.0));
+
+		@Override
+		public void flatMap(Vertex<T, Degrees> vertex, Collector<Tuple2<T, DoubleValue>> out)
+				throws Exception {
+			if (vertex.f1.getInDegree().getValue() == 0) {
+				output.f0 = vertex.f0;
+				out.collect(output);
+			}
+		}
+	}
+
+	/**
+	 * PageRank scores sum to 1.0 so initialize each vertex with the inverse of
+	 * the number of vertices.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFields("0")
+	private static class InitializeVertexScores<T>
+	extends RichMapFunction<Vertex<T, Degrees>, Tuple2<T, DoubleValue>> {
+		private Tuple2<T, DoubleValue> output = new Tuple2<>();
+
+		@Override
+		public void open(Configuration parameters)
+				throws Exception {
+			super.open(parameters);
+
+			Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT);
+			output.f1 = new DoubleValue(1.0 / vertexCount.iterator().next().getValue());
+		}
+
+		@Override
+		public Tuple2<T, DoubleValue> map(Vertex<T, Degrees> vertex)
+				throws Exception {
+			output.f0 = vertex.f0;
+			return output;
+		}
+	}
+
+	/**
+	 * The PageRank score for each vertex is divided evenly and projected to
+	 * neighbors on out-edges.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFieldsSecond("1->0")
+	private static class SendScore<T>
+	implements CoGroupFunction<Tuple2<T, DoubleValue>, Edge<T, LongValue>, Tuple2<T, DoubleValue>> {
+		private Tuple2<T, DoubleValue> output = new Tuple2<>(null, new DoubleValue());
+
+		@Override
+		public void coGroup(Iterable<Tuple2<T, DoubleValue>> vertex, Iterable<Edge<T, LongValue>> edges, Collector<Tuple2<T, DoubleValue>> out)
+				throws Exception {
+			Iterator<Edge<T, LongValue>> edgeIterator = edges.iterator();
+
+			if (edgeIterator.hasNext()) {
+				Edge<T, LongValue> edge = edgeIterator.next();
+
+				output.f0 = edge.f1;
+				output.f1.setValue(vertex.iterator().next().f1.getValue() / edge.f2.getValue());
+				out.collect(output);
+
+				while (edgeIterator.hasNext()) {
+					edge = edgeIterator.next();
+					output.f0 = edge.f1;
+					out.collect(output);
+				}
+			}
+		}
+	}
+
+	/**
+	 * Sum the PageRank score over all vertices. The vertex ID must be ignored
+	 * but is retained rather than adding another operator.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFields("0")
+	private static class SumVertexScores<T>
+	implements ReduceFunction<Tuple2<T, DoubleValue>> {
+		@Override
+		public Tuple2<T, DoubleValue> reduce(Tuple2<T, DoubleValue> first, Tuple2<T, DoubleValue> second)
+				throws Exception {
+			first.f1.setValue(first.f1.getValue() + second.f1.getValue());
+			return first;
+		}
+	}
+
+	/**
+	 * Each iteration the per-vertex scores are adjusted with the damping
+	 * factor. Each score is multiplied by the damping factor then added to the
+	 * probability of a "random hop", which is one minus the damping factor.
+	 *
+	 * <p>This operation also accounts for 'sink' vertices, which have no
+	 * out-edges to project score to. The sink scores are computed by taking
+	 * one minus the sum of vertex scores, which also includes precision error.
+	 * This 'missing' score is evenly distributed across vertices as with the
+	 * random hop.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFields("0")
+	private static class AdjustScores<T>
+	extends RichMapFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>> {
+		private double dampingFactor;
+
+		private long vertexCount;
+
+		private double uniformlyDistributedScore;
+
+		public AdjustScores(double dampingFactor) {
+			this.dampingFactor = dampingFactor;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			Collection<Tuple2<T, DoubleValue>> sumOfScores = getRuntimeContext().getBroadcastVariable(SUM_OF_SCORES);
+			// floating point precision error is also included in sumOfSinks
+			double sumOfSinks = 1 - sumOfScores.iterator().next().f1.getValue();
+
+			Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT);
+			this.vertexCount = vertexCount.iterator().next().getValue();
+
+			this.uniformlyDistributedScore = ((1 - dampingFactor) + dampingFactor * sumOfSinks) / this.vertexCount;
+		}
+
+		@Override
+		public Tuple2<T, DoubleValue> map(Tuple2<T, DoubleValue> value) throws Exception {
+			value.f1.setValue(uniformlyDistributedScore + (dampingFactor * value.f1.getValue()));
+			return value;
+		}
+	}
+
+	/**
+	 * Computes the sum of the absolute change in vertex PageRank scores
+	 * between iterations.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFieldsFirst("0")
+	@ForwardedFieldsSecond("*")
+	private static class ChangeInScores<T>
+	extends RichJoinFunction<Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>, Tuple2<T, DoubleValue>> {
+		private double changeInScores;
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
+
+			changeInScores = 0.0;
+		}
+
+		@Override
+		public void close()
+				throws Exception {
+			super.close();
+
+			DoubleSumAggregator agg = getIterationRuntimeContext().getIterationAggregator(CHANGE_IN_SCORES);
+			agg.aggregate(changeInScores);
+		}
+
+		@Override
+		public Tuple2<T, DoubleValue> join(Tuple2<T, DoubleValue> first, Tuple2<T, DoubleValue> second)
+				throws Exception {
+			changeInScores += Math.abs(second.f1.getValue() - first.f1.getValue());
+			return second;
+		}
+	}
+
+	/**
+	 * Monitors the sum of the absolute change in vertex scores. The algorithm
+	 * terminates when the change in scores compared against the prior iteration
+	 * falls to or below the given convergence threshold.
+	 */
+	private static class ScoreConvergence
+	implements ConvergenceCriterion<DoubleValue> {
+		private double convergenceThreshold;
+
+		public ScoreConvergence(double convergenceThreshold) {
+			this.convergenceThreshold = convergenceThreshold;
+		}
+
+		@Override
+		public boolean isConverged(int iteration, DoubleValue value) {
+			double val = value.getValue();
+			return (val <= convergenceThreshold);
+		}
+	}
+
+	/**
+	 * Map the Tuple result to the return type.
+	 *
+	 * @param <T> ID type
+	 */
+	@ForwardedFields("0; 1")
+	private static class TranslateResult<T>
+		implements MapFunction<Tuple2<T, DoubleValue>, Result<T>> {
+		private Result<T> output = new Result<>();
+
+		@Override
+		public Result<T> map(Tuple2<T, DoubleValue> value) throws Exception {
+			output.f0 = value.f0;
+			output.f1 = value.f1;
+			return output;
+		}
+	}
+
+	/**
+	 * Wraps the {@link Tuple2} to encapsulate results from the PageRank algorithm.
+	 *
+	 * @param <T> ID type
+	 */
+	public static class Result<T>
+	extends Tuple2<T, DoubleValue>
+	implements PrintableResult, UnaryResult<T> {
+		public static final int HASH_SEED = 0x4010af29;
+
+		private MurmurHash hasher = new MurmurHash(HASH_SEED);
+
+		@Override
+		public T getVertexId0() {
+			return f0;
+		}
+
+		@Override
+		public void setVertexId0(T value) {
+			f0 = value;
+		}
+
+		/**
+		 * Get the PageRank score.
+		 *
+		 * @return the PageRank score
+		 */
+		public DoubleValue getPageRankScore() {
+			return f1;
+		}
+
+		@Override
+		public String toPrintableString() {
+			return "Vertex ID: " + getVertexId0()
+				+ ", PageRank score: " + getPageRankScore();
+		}
+
+		@Override
+		public int hashCode() {
+			return hasher.reset()
+				.hash(f0.hashCode())
+				.hash(f1.getValue())
+				.hash();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
index d2eeb41..ff4ad48 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/ChecksumHashCode.java
@@ -18,9 +18,9 @@
 
 package org.apache.flink.graph.library.metric;
 
-import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalyticBase;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 
@@ -35,7 +35,7 @@ import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
  * @param <EV> edge value type
  */
 public class ChecksumHashCode<K, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Checksum> {
+extends GraphAnalyticBase<K, VV, EV, Checksum> {
 
 	private org.apache.flink.graph.asm.dataset.ChecksumHashCode<Vertex<K, VV>> vertexChecksum;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
index 82cc607..30d3563 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.graph.library.metric.directed;
 
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.accumulators.LongMaximum;
 import org.apache.flink.api.common.functions.FlatMapFunction;
@@ -31,10 +29,10 @@ import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.AnalyticHelper;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalyticBase;
 import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import org.apache.flink.graph.asm.result.PrintableResult;
@@ -42,13 +40,16 @@ import org.apache.flink.graph.library.metric.directed.EdgeMetrics.Result;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
 import java.io.IOException;
 import java.text.NumberFormat;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
- * Compute the following edge metrics in a directed graph:
+ * Compute the following edge metrics in a directed graph.
  *  - number of triangle triplets
  *  - number of rectangle triplets
  *  - maximum number of triangle triplets
@@ -59,7 +60,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeMetrics<K extends Comparable<K>, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Result> {
+extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private static final String TRIANGLE_TRIPLET_COUNT = "triangleTripletCount";
 
@@ -88,11 +89,11 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 	/*
 	 * Implementation notes:
 	 *
-	 * Use aggregator to replace SumEdgeStats when aggregators are rewritten to use
-	 *   a hash-combineable hashable-reduce.
+	 * <p>Use aggregator to replace SumEdgeStats when aggregators are rewritten to use
+	 * a hash-combineable hashable-reduce.
 	 *
-	 * Use distinct to replace ReduceEdgeStats when the combiner can be disabled
-	 *   with a sorted-reduce forced.
+	 * <p>Use distinct to replace ReduceEdgeStats when the combiner can be disabled
+	 * with a sorted-reduce forced.
 	 */
 
 	@Override
@@ -147,7 +148,7 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 	 * the target vertex ID, the source degrees, and the low-order count. The
 	 * second tuple is the same with the source and target roles reversed.
 	 *
-	 * The low-order count is one if the source vertex degree is less than the
+	 * <p>The low-order count is one if the source vertex degree is less than the
 	 * target vertex degree or if the degrees are equal and the source vertex
 	 * ID compares lower than the target vertex ID; otherwise the low-order
 	 * count is zero.
@@ -346,11 +347,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 		@Override
 		public boolean equals(Object obj) {
-			if (obj == null) { return false; }
-			if (obj == this) { return true; }
-			if (obj.getClass() != getClass()) { return false; }
+			if (obj == null) {
+				return false;
+			}
+
+			if (obj == this) {
+				return true;
+			}
+
+			if (obj.getClass() != getClass()) {
+				return false;
+			}
 
-			Result rhs = (Result)obj;
+			Result rhs = (Result) obj;
 
 			return new EqualsBuilder()
 				.append(triangleTripletCount, rhs.triangleTripletCount)

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
index 9764f6b..3931f65 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
@@ -18,27 +18,28 @@
 
 package org.apache.flink.graph.library.metric.directed;
 
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.accumulators.LongMaximum;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.AnalyticHelper;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalyticBase;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.metric.directed.VertexMetrics.Result;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
 import java.io.IOException;
 import java.text.NumberFormat;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
- * Compute the following vertex metrics in a directed graph:
+ * Compute the following vertex metrics in a directed graph.
  *  - number of vertices
  *  - number of edges
  *  - number of unidirectional edges
@@ -55,7 +56,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class VertexMetrics<K extends Comparable<K>, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Result> {
+extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private static final String VERTEX_COUNT = "vertexCount";
 
@@ -255,25 +256,25 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		/**
 		 * Get the average degree, the average number of in- plus out-edges per vertex.
 		 *
-		 * A result of {@code Float.NaN} is returned for an empty graph for
+		 * <p>A result of {@code Float.NaN} is returned for an empty graph for
 		 * which both the number of edges and number of vertices is zero.
 		 *
 		 * @return average degree
 		 */
 		public double getAverageDegree() {
-			return vertexCount == 0 ? Double.NaN : getNumberOfEdges() / (double)vertexCount;
+			return vertexCount == 0 ? Double.NaN : getNumberOfEdges() / (double) vertexCount;
 		}
 
 		/**
 		 * Get the density, the ratio of actual to potential edges between vertices.
 		 *
-		 * A result of {@code Float.NaN} is returned for a graph with fewer than
+		 * <p>A result of {@code Float.NaN} is returned for a graph with fewer than
 		 * two vertices for which the number of edges is zero.
 		 *
 		 * @return density
 		 */
 		public double getDensity() {
-			return vertexCount <= 1 ? Double.NaN : getNumberOfEdges() / (double)(vertexCount*(vertexCount-1));
+			return vertexCount <= 1 ? Double.NaN : getNumberOfEdges() / (double) (vertexCount * (vertexCount - 1));
 		}
 
 		/**
@@ -358,11 +359,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 		@Override
 		public boolean equals(Object obj) {
-			if (obj == null) { return false; }
-			if (obj == this) { return true; }
-			if (obj.getClass() != getClass()) { return false; }
+			if (obj == null) {
+				return false;
+			}
+
+			if (obj == this) {
+				return true;
+			}
+
+			if (obj.getClass() != getClass()) {
+				return false;
+			}
 
-			Result rhs = (Result)obj;
+			Result rhs = (Result) obj;
 
 			return new EqualsBuilder()
 				.append(vertexCount, rhs.vertexCount)

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
index 31f01d8..d1c7fb7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.graph.library.metric.undirected;
 
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.accumulators.LongMaximum;
 import org.apache.flink.api.common.functions.MapFunction;
@@ -28,22 +26,25 @@ import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.AnalyticHelper;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalyticBase;
 import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair;
 import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.metric.undirected.EdgeMetrics.Result;
 import org.apache.flink.types.LongValue;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
 import java.io.IOException;
 import java.text.NumberFormat;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
- * Compute the following edge metrics in an undirected graph:
+ * Compute the following edge metrics in an undirected graph.
  *  - number of triangle triplets
  *  - number of rectangle triplets
  *  - maximum number of triangle triplets
@@ -54,7 +55,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class EdgeMetrics<K extends Comparable<K>, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Result> {
+extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private static final String TRIANGLE_TRIPLET_COUNT = "triangleTripletCount";
 
@@ -101,8 +102,8 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 	/*
 	 * Implementation notes:
 	 *
-	 * Use aggregator to replace SumEdgeStats when aggregators are rewritten to use
-	 *   a hash-combineable hashed-reduce.
+	 * <p>Use aggregator to replace SumEdgeStats when aggregators are rewritten to use
+	 * a hash-combineable hashed-reduce.
 	 */
 
 	@Override
@@ -319,11 +320,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 		@Override
 		public boolean equals(Object obj) {
-			if (obj == null) { return false; }
-			if (obj == this) { return true; }
-			if (obj.getClass() != getClass()) { return false; }
+			if (obj == null) {
+				return false;
+			}
+
+			if (obj == this) {
+				return true;
+			}
+
+			if (obj.getClass() != getClass()) {
+				return false;
+			}
 
-			Result rhs = (Result)obj;
+			Result rhs = (Result) obj;
 
 			return new EqualsBuilder()
 				.append(triangleTripletCount, rhs.triangleTripletCount)

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
index dd2411e..000f4e0 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -18,27 +18,28 @@
 
 package org.apache.flink.graph.library.metric.undirected;
 
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.accumulators.LongMaximum;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.AbstractGraphAnalytic;
 import org.apache.flink.graph.AnalyticHelper;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalyticBase;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
 import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.metric.undirected.VertexMetrics.Result;
 import org.apache.flink.types.LongValue;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
 import java.io.IOException;
 import java.text.NumberFormat;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
- * Compute the following vertex metrics in an undirected graph:
+ * Compute the following vertex metrics in an undirected graph.
  *  - number of vertices
  *  - number of edges
  *  - average degree
@@ -51,7 +52,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <EV> edge value type
  */
 public class VertexMetrics<K extends Comparable<K>, VV, EV>
-extends AbstractGraphAnalytic<K, VV, EV, Result> {
+extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private static final String VERTEX_COUNT = "vertexCount";
 
@@ -222,26 +223,26 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 		/**
 		 * Get the average degree, the average number of edges per vertex.
 		 *
-		 * A result of {@code Float.NaN} is returned for an empty graph for
+		 * <p>A result of {@code Float.NaN} is returned for an empty graph for
 		 * which both the number of edges and number of vertices is zero.
 		 *
 		 * @return average degree
 		 */
 		public double getAverageDegree() {
 			// each edge is incident on two vertices
-			return vertexCount == 0 ? Double.NaN : 2 * edgeCount / (double)vertexCount;
+			return vertexCount == 0 ? Double.NaN : 2 * edgeCount / (double) vertexCount;
 		}
 
 		/**
 		 * Get the density, the ratio of actual to potential edges between vertices.
 		 *
-		 * A result of {@code Float.NaN} is returned for a graph with fewer than
+		 * <p>A result of {@code Float.NaN} is returned for a graph with fewer than
 		 * two vertices for which the number of edges is zero.
 		 *
 		 * @return density
 		 */
 		public double getDensity() {
-			return vertexCount <= 1 ? Double.NaN : edgeCount / (double)(vertexCount*(vertexCount-1)/2);
+			return vertexCount <= 1 ? Double.NaN : edgeCount / (double) (vertexCount * (vertexCount - 1) / 2);
 		}
 
 		/**
@@ -301,11 +302,19 @@ extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 		@Override
 		public boolean equals(Object obj) {
-			if (obj == null) { return false; }
-			if (obj == this) { return true; }
-			if (obj.getClass() != getClass()) { return false; }
+			if (obj == null) {
+				return false;
+			}
+
+			if (obj == this) {
+				return true;
+			}
+
+			if (obj.getClass() != getClass()) {
+				return false;
+			}
 
-			Result rhs = (Result)obj;
+			Result rhs = (Result) obj;
 
 			return new EqualsBuilder()
 				.append(vertexCount, rhs.vertexCount)

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index 7df3235..e1bda93 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -37,7 +37,7 @@ import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
 import org.apache.flink.graph.asm.result.BinaryResult;
 import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.similarity.AdamicAdar.Result;
-import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.MurmurHash;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.FloatValue;
@@ -54,17 +54,17 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf
- * <p>
- * Adamic-Adar measures the similarity between pairs of vertices as the sum of
+ *
+ * <p>Adamic-Adar measures the similarity between pairs of vertices as the sum of
  * the inverse logarithm of degree over shared neighbors. Scores are non-negative
  * and unbounded. A vertex with higher degree has greater overall influence but
  * is less influential to each pair of neighbors.
- * <p>
- * This implementation produces similarity scores for each pair of vertices
+ *
+ * <p>This implementation produces similarity scores for each pair of vertices
  * in the graph with at least one shared neighbor; equivalently, this is the
  * set of all non-zero Adamic-Adar coefficients.
- * <p>
- * The input graph must be a simple, undirected graph containing no duplicate
+ *
+ * <p>The input graph must be a simple, undirected graph containing no duplicate
  * edges or self-loops.
  *
  * @param <K> graph ID type
@@ -137,7 +137,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
 		Preconditions.checkNotNull(other);
 
-		if (! AdamicAdar.class.isAssignableFrom(other.getClass())) {
+		if (!AdamicAdar.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 
@@ -254,7 +254,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 			long degree = value.f1.getValue();
 			// when the degree is one the logarithm is zero so avoid dividing by this value
-			float inverseLogDegree = (degree == 1) ? 0.0f : 1.0f / (float)Math.log(value.f1.getValue());
+			float inverseLogDegree = (degree == 1) ? 0.0f : 1.0f / (float) Math.log(value.f1.getValue());
 			output.f2.setValue(inverseLogDegree);
 
 			return output;
@@ -266,7 +266,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 *
 	 * @param <T> ID type
 	 */
-	@ForwardedFields("0->1; 1->2 ; 2->3")
+	@ForwardedFields("0->1; 1->2; 2->3")
 	private static class GenerateGroupSpans<T>
 	implements GroupReduceFunction<Tuple3<T, T, FloatValue>, Tuple4<IntValue, T, T, FloatValue>> {
 		private IntValue groupSpansValue = new IntValue();
@@ -309,7 +309,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				throws Exception {
 			int spans = value.f0.getValue();
 
-			for (int idx = 0 ; idx < spans ; idx++ ) {
+			for (int idx = 0; idx < spans; idx++) {
 				value.f0.setValue(idx);
 				out.collect(value);
 			}
@@ -339,16 +339,16 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				output.f1 = edge.f2;
 				output.f2 = edge.f3;
 
-				for (int i = 0 ; i < visitedCount ; i++) {
+				for (int i = 0; i < visitedCount; i++) {
 					output.f0 = visited.get(i);
 					out.collect(output);
 				}
 
 				if (visitedCount < GROUP_SIZE) {
-					if (! initialized) {
+					if (!initialized) {
 						initialized = true;
 
-						for (int i = 0 ; i < GROUP_SIZE ; i++) {
+						for (int i = 0; i < GROUP_SIZE; i++) {
 							visited.add(edge.f2.copy());
 						}
 					} else {
@@ -451,7 +451,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	implements PrintableResult, BinaryResult<T>, Comparable<Result<T>> {
 		public static final int HASH_SEED = 0xe405f6d1;
 
-		private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
+		private MurmurHash hasher = new MurmurHash(HASH_SEED);
 
 		/**
 		 * No-args constructor.
@@ -482,7 +482,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 		/**
 		 * Get the Adamic-Adar score, equal to the sum over common neighbors of
-		 * the inverse logarithm of degree
+		 * the inverse logarithm of degree.
 		 *
 		 * @return Adamic-Adar score
 		 */

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index b3e69f1..35217c6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -32,7 +32,7 @@ import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeTargetDegree;
 import org.apache.flink.graph.asm.result.BinaryResult;
 import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.library.similarity.JaccardIndex.Result;
-import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.MurmurHash;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.IntValue;
@@ -50,12 +50,12 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * is computed as the number of shared neighbors divided by the number of
  * distinct neighbors. Scores range from 0.0 (no shared neighbors) to 1.0 (all
  * neighbors are shared).
- * <p>
- * This implementation produces similarity scores for each pair of vertices
+ *
+ * <p>This implementation produces similarity scores for each pair of vertices
  * in the graph with at least one shared neighbor; equivalently, this is the
  * set of all non-zero Jaccard Similarity coefficients.
- * <p>
- * The input graph must be a simple, undirected graph containing no duplicate
+ *
+ * <p>The input graph must be a simple, undirected graph containing no duplicate
  * edges or self-loops.
  *
  * @param <K> graph ID type
@@ -89,7 +89,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 * pairs. Small groups generate more data whereas large groups distribute
 	 * computation less evenly among tasks.
 	 *
-	 * The default value should be near-optimal for all use cases.
+	 * <p>The default value should be near-optimal for all use cases.
 	 *
 	 * @param groupSize the group size for the quadratic expansion of neighbor pairs
 	 * @return this
@@ -180,7 +180,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
 		Preconditions.checkNotNull(other);
 
-		if (! JaccardIndex.class.isAssignableFrom(other.getClass())) {
+		if (!JaccardIndex.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 
@@ -267,11 +267,11 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	 * This is the first of three operations implementing a self-join to generate
 	 * the full neighbor pairing for each vertex. The number of neighbor pairs
 	 * is (n choose 2) which is quadratic in the vertex degree.
-	 * <p>
-	 * The third operation, {@link GenerateGroupPairs}, processes groups of size
+	 *
+	 * <p>The third operation, {@link GenerateGroupPairs}, processes groups of size
 	 * {@link #groupSize} and emits {@code O(groupSize * deg(vertex))} pairs.
-	 * <p>
-	 * This input to the third operation is still quadratic in the vertex degree.
+	 *
+	 * <p>This input to the third operation is still quadratic in the vertex degree.
 	 * Two prior operations, {@link GenerateGroupSpans} and {@link GenerateGroups},
 	 * each emit datasets linear in the vertex degree, with a forced rebalance
 	 * in between. {@link GenerateGroupSpans} first annotates each edge with the
@@ -310,7 +310,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				// group span, u, v, d(v)
 				output.f1 = edge.f0;
 				output.f2 = edge.f1;
-				output.f3.setValue((int)degree);
+				output.f3.setValue((int) degree);
 
 				out.collect(output);
 
@@ -337,7 +337,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				throws Exception {
 			int spans = value.f0.getValue();
 
-			for (int idx = 0 ; idx < spans ; idx++ ) {
+			for (int idx = 0; idx < spans; idx++) {
 				value.f0.setValue(idx);
 				out.collect(value);
 			}
@@ -346,8 +346,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 	/**
 	 * Emits the two-path for all neighbor pairs in this group.
-	 * <p>
-	 * The first {@link #groupSize} vertices are emitted pairwise. Following
+	 *
+	 * <p>The first {@link #groupSize} vertices are emitted pairwise. Following
 	 * vertices are only paired with vertices from this initial group.
 	 *
 	 * @param <T> ID type
@@ -373,7 +373,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			int visitedCount = 0;
 
 			for (Tuple4<IntValue, T, T, IntValue> edge : values) {
-				for (int i = 0 ; i < visitedCount ; i++) {
+				for (int i = 0; i < visitedCount; i++) {
 					Tuple3<T, T, IntValue> prior = visited.get(i);
 
 					prior.f1 = edge.f2;
@@ -384,7 +384,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 					if (degreeSum > Integer.MAX_VALUE) {
 						throw new RuntimeException("Degree sum overflows IntValue");
 					}
-					prior.f2.setValue((int)degreeSum);
+					prior.f2.setValue((int) degreeSum);
 
 					// v, w, d(v) + d(w)
 					out.collect(prior);
@@ -393,10 +393,10 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				}
 
 				if (visitedCount < groupSize) {
-					if (! initialized) {
+					if (!initialized) {
 						initialized = true;
 
-						for (int i = 0 ; i < groupSize ; i++) {
+						for (int i = 0; i < groupSize; i++) {
 							Tuple3<T, T, IntValue> tuple = new Tuple3<>();
 
 							tuple.f0 = edge.f2.copy();
@@ -506,7 +506,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	implements PrintableResult, BinaryResult<T>, Comparable<Result<T>> {
 		public static final int HASH_SEED = 0x731f73e7;
 
-		private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
+		private MurmurHash hasher = new MurmurHash(HASH_SEED);
 
 		public Result() {
 			f2 = new IntValue();
@@ -584,8 +584,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			// exact comparison of a/b with x/y using only integer math:
 			// a/b <?> x/y == a*y <?> b*x
 
-			long ay = getSharedNeighborCount().getValue() * (long)o.getDistinctNeighborCount().getValue();
-			long bx = getDistinctNeighborCount().getValue() * (long)o.getSharedNeighborCount().getValue();
+			long ay = getSharedNeighborCount().getValue() * (long) o.getDistinctNeighborCount().getValue();
+			long bx = getDistinctNeighborCount().getValue() * (long) o.getSharedNeighborCount().getValue();
 
 			return Long.compare(ay, bx);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
index af25377..1f2d832 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/ComputeFunction.java
@@ -35,7 +35,7 @@ import java.util.Iterator;
 
 /**
  * The base class for the message-passing functions between vertices as a part of a {@link VertexCentricIteration}.
- * 
+ *
  * @param <K> The type of the vertex key (the vertex identifier).
  * @param <VV> The type of the vertex value (the state of the vertex).
  * @param <EV> The type of the values that are associated with the edges.
@@ -53,10 +53,10 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
 	 * This method is invoked once per superstep, for each active vertex.
 	 * A vertex is active during a superstep, if at least one message was produced for it,
 	 * in the previous superstep. During the first superstep, all vertices are active.
-	 * <p>
-	 * This method can iterate over all received messages, set the new vertex value, and
+	 *
+	 * <p>This method can iterate over all received messages, set the new vertex value, and
 	 * send messages to other vertices (which will be delivered in the next superstep).
-	 * 
+	 *
 	 * @param vertex The vertex executing this function
 	 * @param messages The messages that were sent to this vertex in the previous superstep
 	 * @throws Exception
@@ -65,23 +65,22 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
 
 	/**
 	 * This method is executed once per superstep before the vertex update function is invoked for each vertex.
-	 * 
+	 *
 	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
 	 */
 	public void preSuperstep() throws Exception {}
-	
+
 	/**
 	 * This method is executed once per superstep after the vertex update function has been invoked for each vertex.
-	 * 
+	 *
 	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
 	 */
 	public void postSuperstep() throws Exception {}
-	
-	
+
 	/**
 	 * Gets an {@link java.lang.Iterable} with all out-going edges. This method is mutually exclusive with
 	 * {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
-	 * 
+	 *
 	 * @return An iterator with all edges.
 	 */
 	public final Iterable<Edge<K, EV>> getEdges() {
@@ -93,7 +92,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
 	/**
 	 * Sends the given message to all vertices that adjacent to the changed vertex.
 	 * This method is mutually exclusive to the method {@link #getEdges()} and may be called only once.
-	 * 
+	 *
 	 * @param m The message to send.
 	 */
 	public final void sendMessageToAllNeighbors(Message m) {
@@ -105,11 +104,11 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
 			out.collect(Either.Right(outMsg));
 		}
 	}
-	
+
 	/**
 	 * Sends the given message to the vertex identified by the given key. If the target vertex does not exist,
 	 * the next superstep will cause an exception due to a non-deliverable message.
-	 * 
+	 *
 	 * @param target The key (id) of the target vertex to message.
 	 * @param m The message.
 	 */
@@ -124,12 +123,12 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
 	/**
 	 * Sets the new value of this vertex.
 	 *
-	 * This should be called at most once per ComputeFunction.
-	 * 
+	 * <p>This should be called at most once per ComputeFunction.
+	 *
 	 * @param newValue The new vertex value.
 	 */
 	public final void setNewVertexValue(VV newValue) {
-		if(setNewVertexValueCalled) {
+		if (setNewVertexValueCalled) {
 			throw new IllegalStateException("setNewVertexValue should only be called at most once per updateVertex");
 		}
 		setNewVertexValueCalled = true;
@@ -138,43 +137,44 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
 
 		out.collect(Either.Left(outVertex));
 	}
+
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * Gets the number of the superstep, starting at <tt>1</tt>.
-	 * 
+	 *
 	 * @return The number of the current superstep.
 	 */
 	public final int getSuperstepNumber() {
 		return this.runtimeContext.getSuperstepNumber();
 	}
-	
+
 	/**
 	 * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
 	 * all aggregates globally once per superstep and makes them available in the next superstep.
-	 * 
+	 *
 	 * @param name The name of the aggregator.
 	 * @return The aggregator registered under this name, or {@code null}, if no aggregator was registered.
 	 */
 	public final <T extends Aggregator<?>> T getIterationAggregator(String name) {
 		return this.runtimeContext.getIterationAggregator(name);
 	}
-	
+
 	/**
 	 * Get the aggregated value that an aggregator computed in the previous iteration.
-	 * 
+	 *
 	 * @param name The name of the aggregator.
 	 * @return The aggregated value of the previous iteration.
 	 */
 	public final <T extends Value> T getPreviousIterationAggregate(String name) {
 		return this.runtimeContext.getPreviousIterationAggregate(name);
 	}
-	
+
 	/**
 	 * Gets the broadcast data set registered under the given name. Broadcast data sets
 	 * are available on all parallel instances of a function. They can be registered via
 	 * {@link org.apache.flink.graph.pregel.VertexCentricConfiguration#addBroadcastSet(String, DataSet)}.
-	 * 
+	 *
 	 * @param name The name under which the broadcast set is registered.
 	 * @return The broadcast data set.
 	 */
@@ -189,26 +189,26 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
 	private Vertex<K, VV> outVertex;
 
 	private Tuple2<K, Message> outMsg;
-	
+
 	private IterationRuntimeContext runtimeContext;
-	
+
 	private Iterator<Edge<K, EV>> edges;
-	
+
 	private Collector<Either<?, ?>> out;
-	
+
 	private EdgesIterator<K, EV> edgeIterator;
-	
+
 	private boolean edgesUsed;
 
 	private boolean setNewVertexValueCalled;
-	
+
 	void init(IterationRuntimeContext context) {
 		this.runtimeContext = context;
 		this.outVertex = new Vertex<>();
 		this.outMsg = new Tuple2<>();
 		this.edgeIterator = new EdgesIterator<>();
 	}
-	
+
 	@SuppressWarnings("unchecked")
 	void set(K vertexId, Iterator<Edge<K, EV>> edges,
 			Collector<Either<Vertex<K, VV>, Tuple2<K, Message>>> out) {
@@ -228,17 +228,17 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
 		edgesUsed = true;
 	}
 
-	private static final class EdgesIterator<K, EV> 
-		implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>>
-	{
+	private static final class EdgesIterator<K, EV>
+		implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>> {
+
 		private Iterator<Edge<K, EV>> input;
-		
+
 		private Edge<K, EV> edge = new Edge<>();
 
 		void set(Iterator<Edge<K, EV>> input) {
 			this.input = input;
 		}
-		
+
 		@Override
 		public boolean hasNext() {
 			return input.hasNext();
@@ -257,6 +257,7 @@ public abstract class ComputeFunction<K, VV, EV, Message> implements Serializabl
 		public void remove() {
 			throw new UnsupportedOperationException();
 		}
+
 		@Override
 		public Iterator<Edge<K, EV>> iterator() {
 			return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
index 6e51a3a..204bc9a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageCombiner.java
@@ -27,7 +27,7 @@ import java.io.Serializable;
 
 /**
  * The base class for combining messages sent during a {@link VertexCentricIteration}.
- * 
+ *
  * @param <K> The type of the vertex id
  * @param <Message> The type of the message sent between vertices along the edges.
  */
@@ -49,7 +49,7 @@ public abstract class MessageCombiner<K, Message> implements Serializable {
 	 * Combines messages sent from different vertices to a target vertex.
 	 * Implementing this method might reduce communication costs during a vertex-centric
 	 * iteration.
-	 * 
+	 *
 	 * @param messages the input messages to combine
 	 * @throws Exception
 	 */
@@ -57,7 +57,7 @@ public abstract class MessageCombiner<K, Message> implements Serializable {
 
 	/**
 	 * Sends the combined message to the target vertex.
-	 * 
+	 *
 	 * @param combinedMessage
 	 * @throws Exception
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java
index 5a17b90..f8dd926 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/MessageIterator.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.graph.pregel;
 
-import java.util.Iterator;
-
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.types.Either;
 import org.apache.flink.types.NullValue;
 
+import java.util.Iterator;
+
 /**
  * An iterator that returns messages. The iterator is {@link java.lang.Iterable} at the same time to support
  * the <i>foreach</i> syntax.
@@ -33,17 +33,17 @@ public final class MessageIterator<Message> implements Iterator<Message>, Iterab
 
 	private transient Iterator<Tuple2<?, Either<NullValue, Message>>> source;
 	private Message first = null;
-	
-	final void setSource(Iterator<Tuple2<?, Either<NullValue, Message>>> source) {
+
+	void setSource(Iterator<Tuple2<?, Either<NullValue, Message>>> source) {
 		this.source = source;
 	}
 
-	final void setFirst(Message msg) {
+	void setFirst(Message msg) {
 		this.first = msg;
 	}
-	
+
 	@Override
-	public final boolean hasNext() {
+	public boolean hasNext() {
 		if (first != null) {
 			return true;
 		}
@@ -51,9 +51,9 @@ public final class MessageIterator<Message> implements Iterator<Message>, Iterab
 			return ((this.source != null) && (this.source.hasNext()));
 		}
 	}
-	
+
 	@Override
-	public final Message next() {
+	public Message next() {
 		if (first != null) {
 			Message toReturn = first;
 			first = null;
@@ -63,7 +63,7 @@ public final class MessageIterator<Message> implements Iterator<Message>, Iterab
 	}
 
 	@Override
-	public final void remove() {
+	public void remove() {
 		throw new UnsupportedOperationException();
 	}
 
@@ -71,4 +71,4 @@ public final class MessageIterator<Message> implements Iterator<Message>, Iterab
 	public Iterator<Message> iterator() {
 		return this;
 	}
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
index a0f793a..69fcc52 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/pregel/VertexCentricConfiguration.java
@@ -30,13 +30,13 @@ import java.util.List;
  * degree of parallelism, to register aggregators and use broadcast sets in
  * the {@link org.apache.flink.graph.pregel.ComputeFunction}.
  *
- * The VertexCentricConfiguration object is passed as an argument to
+ * <p>The VertexCentricConfiguration object is passed as an argument to
  * {@link org.apache.flink.graph.Graph#runVertexCentricIteration (
  * org.apache.flink.graph.pregel.ComputeFunction, int, VertexCentricConfiguration)}.
  */
 public class VertexCentricConfiguration extends IterationConfiguration {
 
-	/** the broadcast variables for the compute function **/
+	/** The broadcast variables for the compute function. **/
 	private List<Tuple2<String, DataSet<?>>> bcVars = new ArrayList<>();
 
 	public VertexCentricConfiguration() {}