You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by ch...@apache.org on 2017/05/26 19:16:19 UTC

[10/15] flink git commit: [FLINK-6709] [gelly] Activate strict checkstyle for flink-gellies

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 71baaa9..670cefb 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -78,7 +78,7 @@ import java.util.NoSuchElementException;
  *
  * @see org.apache.flink.graph.Edge
  * @see org.apache.flink.graph.Vertex
- * 
+ *
  * @param <K> the key type for edge and vertex identifiers
  * @param <VV> the value type for vertices
  * @param <EV> the value type for edges
@@ -91,8 +91,8 @@ public class Graph<K, VV, EV> {
 	private final DataSet<Edge<K, EV>> edges;
 
 	/**
-	 * Creates a graph from two DataSets: vertices and edges
-	 * 
+	 * Creates a graph from two DataSets: vertices and edges.
+	 *
 	 * @param vertices a DataSet of vertices.
 	 * @param edges a DataSet of edges.
 	 * @param context the flink execution environment.
@@ -105,7 +105,7 @@ public class Graph<K, VV, EV> {
 
 	/**
 	 * Creates a graph from a Collection of vertices and a Collection of edges.
-	 * 
+	 *
 	 * @param vertices a Collection of vertices.
 	 * @param edges a Collection of edges.
 	 * @param context the flink execution environment.
@@ -122,7 +122,7 @@ public class Graph<K, VV, EV> {
 	 * Creates a graph from a Collection of edges.
 	 * Vertices are created automatically and their values are set to
 	 * NullValue.
-	 * 
+	 *
 	 * @param edges a Collection of edges.
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
@@ -135,12 +135,12 @@ public class Graph<K, VV, EV> {
 
 	/**
 	 * Creates a graph from a Collection of edges.
-	 * Vertices are created automatically and their values are set 
+	 * Vertices are created automatically and their values are set
 	 * by applying the provided map function to the vertex IDs.
-	 * 
+	 *
 	 * @param edges a Collection of edges.
 	 * @param vertexValueInitializer a map function that initializes the vertex values.
-	 * It allows to apply a map transformation on the vertex ID to produce an initial vertex value. 
+	 * It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
 	 */
@@ -152,7 +152,7 @@ public class Graph<K, VV, EV> {
 
 	/**
 	 * Creates a graph from a DataSet of vertices and a DataSet of edges.
-	 * 
+	 *
 	 * @param vertices a DataSet of vertices.
 	 * @param edges a DataSet of edges.
 	 * @param context the flink execution environment.
@@ -168,7 +168,7 @@ public class Graph<K, VV, EV> {
 	 * Creates a graph from a DataSet of edges.
 	 * Vertices are created automatically and their values are set to
 	 * NullValue.
-	 * 
+	 *
 	 * @param edges a DataSet of edges.
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
@@ -201,7 +201,7 @@ public class Graph<K, VV, EV> {
 	 * Creates a graph from a DataSet of edges.
 	 * Vertices are created automatically and their values are set
 	 * by applying the provided map function to the vertex IDs.
-	 * 
+	 *
 	 * @param edges a DataSet of edges.
 	 * @param vertexValueInitializer the mapper function that initializes the vertex values.
 	 * It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
@@ -251,15 +251,15 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Creates a graph from a DataSet of Tuple2 objects for vertices and 
+	 * Creates a graph from a DataSet of Tuple2 objects for vertices and
 	 * Tuple3 objects for edges.
-	 * <p>
-	 * The first field of the Tuple2 vertex object will become the vertex ID
+	 *
+	 * <p>The first field of the Tuple2 vertex object will become the vertex ID
 	 * and the second field will become the vertex value.
 	 * The first field of the Tuple3 object for edges will become the source ID,
 	 * the second field will become the target ID, and the third field will become
 	 * the edge value.
-	 * 
+	 *
 	 * @param vertices a DataSet of Tuple2 representing the vertices.
 	 * @param edges a DataSet of Tuple3 representing the edges.
 	 * @param context the flink execution environment.
@@ -281,13 +281,13 @@ public class Graph<K, VV, EV> {
 
 	/**
 	 * Creates a graph from a DataSet of Tuple3 objects for edges.
-	 * <p>
-	 * The first field of the Tuple3 object will become the source ID,
+	 *
+	 * <p>The first field of the Tuple3 object will become the source ID,
 	 * the second field will become the target ID, and the third field will become
 	 * the edge value.
-	 * <p>
-	 * Vertices are created automatically and their values are set to NullValue.
-	 * 
+	 *
+	 * <p>Vertices are created automatically and their values are set to NullValue.
+	 *
 	 * @param edges a DataSet of Tuple3 representing the edges.
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
@@ -304,14 +304,14 @@ public class Graph<K, VV, EV> {
 
 	/**
 	 * Creates a graph from a DataSet of Tuple3 objects for edges.
-	 * <p>
-	 * Each Tuple3 will become one Edge, where the source ID will be the first field of the Tuple2,
+	 *
+	 * <p>Each Tuple3 will become one Edge, where the source ID will be the first field of the Tuple2,
 	 * the target ID will be the second field of the Tuple2
 	 * and the Edge value will be the third field of the Tuple3.
-	 * <p>
-	 * Vertices are created automatically and their values are initialized
+	 *
+	 * <p>Vertices are created automatically and their values are initialized
 	 * by applying the provided vertexValueInitializer map function to the vertex IDs.
-	 * 
+	 *
 	 * @param edges a DataSet of Tuple3.
 	 * @param vertexValueInitializer the mapper function that initializes the vertex values.
 	 * It allows to apply a map transformation on the vertex ID to produce an initial vertex value.
@@ -332,9 +332,9 @@ public class Graph<K, VV, EV> {
 	 * Creates a graph from a DataSet of Tuple2 objects for edges.
 	 * Each Tuple2 will become one Edge, where the source ID will be the first field of the Tuple2
 	 * and the target ID will be the second field of the Tuple2.
-	 * <p>
-	 * Edge value types and Vertex values types will be set to NullValue.
-	 * 
+	 *
+	 * <p>Edge value types and Vertex values types will be set to NullValue.
+	 *
 	 * @param edges a DataSet of Tuple2.
 	 * @param context the flink execution environment.
 	 * @return the newly created graph.
@@ -353,10 +353,10 @@ public class Graph<K, VV, EV> {
 	 * Creates a graph from a DataSet of Tuple2 objects for edges.
 	 * Each Tuple2 will become one Edge, where the source ID will be the first field of the Tuple2
 	 * and the target ID will be the second field of the Tuple2.
-	 * <p>
-	 * Edge value types will be set to NullValue.
+	 *
+	 * <p>Edge value types will be set to NullValue.
 	 * Vertex values can be initialized by applying a user-defined map function on the vertex IDs.
-	 * 
+	 *
 	 * @param edges a DataSet of Tuple2, where the first field corresponds to the source ID
 	 * and the second field corresponds to the target ID.
 	 * @param vertexValueInitializer the mapper function that initializes the vertex values.
@@ -376,13 +376,13 @@ public class Graph<K, VV, EV> {
 
 	/**
 	* Creates a Graph from a CSV file of vertices and a CSV file of edges.
-	* 
+	*
 	* @param verticesPath path to a CSV file with the Vertex data.
 	* @param edgesPath path to a CSV file with the Edge data
 	* @param context the Flink execution environment.
-	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader}, 
+	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader},
 	* on which calling methods to specify types of the Vertex ID, Vertex value and Edge value returns a Graph.
-	* 
+	*
 	* @see org.apache.flink.graph.GraphCsvReader#types(Class, Class, Class)
 	* @see org.apache.flink.graph.GraphCsvReader#vertexTypes(Class, Class)
 	* @see org.apache.flink.graph.GraphCsvReader#edgeTypes(Class, Class)
@@ -392,14 +392,14 @@ public class Graph<K, VV, EV> {
 		return new GraphCsvReader(verticesPath, edgesPath, context);
 	}
 
-	/** 
+	/**
 	* Creates a graph from a CSV file of edges. Vertices will be created automatically.
 	*
 	* @param edgesPath a path to a CSV file with the Edges data
 	* @param context the execution environment.
 	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader},
 	* on which calling methods to specify types of the Vertex ID, Vertex value and Edge value returns a Graph.
-	* 
+	*
 	* @see org.apache.flink.graph.GraphCsvReader#types(Class, Class, Class)
 	* @see org.apache.flink.graph.GraphCsvReader#vertexTypes(Class, Class)
 	* @see org.apache.flink.graph.GraphCsvReader#edgeTypes(Class, Class)
@@ -409,7 +409,7 @@ public class Graph<K, VV, EV> {
 		return new GraphCsvReader(edgesPath, context);
 	}
 
-	/** 
+	/**
 	 * Creates a graph from a CSV file of edges. Vertices will be created automatically and
 	 * Vertex values can be initialized using a user-defined mapper.
 	 *
@@ -419,7 +419,7 @@ public class Graph<K, VV, EV> {
 	 * @param context the execution environment.
 	 * @return An instance of {@link org.apache.flink.graph.GraphCsvReader},
 	 * on which calling methods to specify types of the Vertex ID, Vertex Value and Edge value returns a Graph.
-	 * 
+	 *
 	 * @see org.apache.flink.graph.GraphCsvReader#types(Class, Class, Class)
 	 * @see org.apache.flink.graph.GraphCsvReader#vertexTypes(Class, Class)
 	 * @see org.apache.flink.graph.GraphCsvReader#edgeTypes(Class, Class)
@@ -440,7 +440,7 @@ public class Graph<K, VV, EV> {
 	/**
 	 * Function that checks whether a Graph is a valid Graph,
 	 * as defined by the given {@link GraphValidator}.
-	 * 
+	 *
 	 * @return true if the Graph is valid.
 	 */
 	public Boolean validate(GraphValidator<K, VV, EV> validator) throws Exception {
@@ -520,7 +520,7 @@ public class Graph<K, VV, EV> {
 
 	/**
 	 * Apply a function to the attribute of each vertex in the graph.
-	 * 
+	 *
 	 * @param mapper the map function to apply.
 	 * @return a new graph
 	 */
@@ -570,7 +570,7 @@ public class Graph<K, VV, EV> {
 
 	/**
 	 * Apply a function to the attribute of each edge in the graph.
-	 * 
+	 *
 	 * @param mapper the map function to apply.
 	 * @return a new graph
 	 */
@@ -659,19 +659,19 @@ public class Graph<K, VV, EV> {
 	 * Joins the vertex DataSet of this graph with an input Tuple2 DataSet and applies
 	 * a user-defined transformation on the values of the matched records.
 	 * The vertex ID and the first field of the Tuple2 DataSet are used as the join keys.
-	 * 
+	 *
 	 * @param inputDataSet the Tuple2 DataSet to join with.
 	 * The first field of the Tuple2 is used as the join key and the second field is passed
-	 * as a parameter to the transformation function. 
+	 * as a parameter to the transformation function.
 	 * @param vertexJoinFunction the transformation function to apply.
 	 * The first parameter is the current vertex value and the second parameter is the value
 	 * of the matched Tuple2 from the input DataSet.
 	 * @return a new Graph, where the vertex values have been updated according to the
 	 * result of the vertexJoinFunction.
-	 * 
+	 *
 	 * @param <T> the type of the second field of the input Tuple2 DataSet.
 	*/
-	public <T> Graph<K, VV, EV> joinWithVertices(DataSet<Tuple2<K, T>> inputDataSet, 
+	public <T> Graph<K, VV, EV> joinWithVertices(DataSet<Tuple2<K, T>> inputDataSet,
 			final VertexJoinFunction<VV, T> vertexJoinFunction) {
 
 		DataSet<Vertex<K, VV>> resultedVertices = this.getVertices()
@@ -714,10 +714,10 @@ public class Graph<K, VV, EV> {
 	 * Joins the edge DataSet with an input DataSet on the composite key of both
 	 * source and target IDs and applies a user-defined transformation on the values
 	 * of the matched records. The first two fields of the input DataSet are used as join keys.
-	 * 
+	 *
 	 * @param inputDataSet the DataSet to join with.
 	 * The first two fields of the Tuple3 are used as the composite join key
-	 * and the third field is passed as a parameter to the transformation function. 
+	 * and the third field is passed as a parameter to the transformation function.
 	 * @param edgeJoinFunction the transformation function to apply.
 	 * The first parameter is the current edge value and the second parameter is the value
 	 * of the matched Tuple3 from the input DataSet.
@@ -771,10 +771,10 @@ public class Graph<K, VV, EV> {
 	 * Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
 	 * on the values of the matched records.
 	 * The source ID of the edges input and the first field of the input DataSet are used as join keys.
-	 * 
+	 *
 	 * @param inputDataSet the DataSet to join with.
 	 * The first field of the Tuple2 is used as the join key
-	 * and the second field is passed as a parameter to the transformation function. 
+	 * and the second field is passed as a parameter to the transformation function.
 	 * @param edgeJoinFunction the transformation function to apply.
 	 * The first parameter is the current edge value and the second parameter is the value
 	 * of the matched Tuple2 from the input DataSet.
@@ -834,10 +834,10 @@ public class Graph<K, VV, EV> {
 	 * Joins the edge DataSet with an input Tuple2 DataSet and applies a user-defined transformation
 	 * on the values of the matched records.
 	 * The target ID of the edges input and the first field of the input DataSet are used as join keys.
-	 * 
+	 *
 	 * @param inputDataSet the DataSet to join with.
 	 * The first field of the Tuple2 is used as the join key
-	 * and the second field is passed as a parameter to the transformation function. 
+	 * and the second field is passed as a parameter to the transformation function.
 	 * @param edgeJoinFunction the transformation function to apply.
 	 * The first parameter is the current edge value and the second parameter is the value
 	 * of the matched Tuple2 from the input DataSet.
@@ -859,7 +859,7 @@ public class Graph<K, VV, EV> {
 	/**
 	 * Apply filtering functions to the graph and return a sub-graph that
 	 * satisfies the predicates for both vertices and edges.
-	 * 
+	 *
 	 * @param vertexFilter the filter function for vertices.
 	 * @param edgeFilter the filter function for edges.
 	 * @return the resulting sub-graph.
@@ -881,7 +881,7 @@ public class Graph<K, VV, EV> {
 	/**
 	 * Apply a filtering function to the graph and return a sub-graph that
 	 * satisfies the predicates only for the vertices.
-	 * 
+	 *
 	 * @param vertexFilter the filter function for vertices.
 	 * @return the resulting sub-graph.
 	 */
@@ -900,7 +900,7 @@ public class Graph<K, VV, EV> {
 	/**
 	 * Apply a filtering function to the graph and return a sub-graph that
 	 * satisfies the predicates only for the edges.
-	 * 
+	 *
 	 * @param edgeFilter the filter function for edges.
 	 * @return the resulting sub-graph.
 	 */
@@ -919,8 +919,8 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Return the out-degree of all vertices in the graph
-	 * 
+	 * Return the out-degree of all vertices in the graph.
+	 *
 	 * @return A DataSet of {@code Tuple2<vertexId, outDegree>}
 	 */
 	public DataSet<Tuple2<K, LongValue>> outDegrees() {
@@ -946,7 +946,7 @@ public class Graph<K, VV, EV> {
 
 			Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
 
-			if(vertexIterator.hasNext()) {
+			if (vertexIterator.hasNext()) {
 				vertexDegree.f0 = vertexIterator.next().f0;
 				out.collect(vertexDegree);
 			} else {
@@ -956,8 +956,8 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Return the in-degree of all vertices in the graph
-	 * 
+	 * Return the in-degree of all vertices in the graph.
+	 *
 	 * @return A DataSet of {@code Tuple2<vertexId, inDegree>}
 	 */
 	public DataSet<Tuple2<K, LongValue>> inDegrees() {
@@ -967,8 +967,8 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Return the degree of all vertices in the graph
-	 * 
+	 * Return the degree of all vertices in the graph.
+	 *
 	 * @return A DataSet of {@code Tuple2<vertexId, degree>}
 	 */
 	public DataSet<Tuple2<K, LongValue>> getDegrees() {
@@ -979,7 +979,7 @@ public class Graph<K, VV, EV> {
 
 	/**
 	 * This operation adds all inverse-direction edges to the graph.
-	 * 
+	 *
 	 * @return the undirected graph.
 	 */
 	public Graph<K, VV, EV> getUndirected() {
@@ -993,10 +993,10 @@ public class Graph<K, VV, EV> {
 	 * Groups by vertex and computes a GroupReduce transformation over the edge values of each vertex.
 	 * The edgesFunction applied on the edges has access to both the id and the value
 	 * of the grouping vertex.
-	 * 
-	 * For each vertex, the edgesFunction can iterate over all edges of this vertex
+	 *
+	 * <p>For each vertex, the edgesFunction can iterate over all edges of this vertex
 	 * with the specified direction, and emit any number of output elements, including none.
-	 * 
+	 *
 	 * @param edgesFunction the group reduce function to apply to the neighboring edges of each vertex.
 	 * @param direction the edge direction (in-, out-, all-).
 	 * @param <T> the output type
@@ -1027,10 +1027,10 @@ public class Graph<K, VV, EV> {
 	 * Groups by vertex and computes a GroupReduce transformation over the edge values of each vertex.
 	 * The edgesFunction applied on the edges has access to both the id and the value
 	 * of the grouping vertex.
-	 * 
-	 * For each vertex, the edgesFunction can iterate over all edges of this vertex
+	 *
+	 * <p>For each vertex, the edgesFunction can iterate over all edges of this vertex
 	 * with the specified direction, and emit any number of output elements, including none.
-	 * 
+	 *
 	 * @param edgesFunction the group reduce function to apply to the neighboring edges of each vertex.
 	 * @param direction the edge direction (in-, out-, all-).
 	 * @param <T> the output type
@@ -1064,10 +1064,10 @@ public class Graph<K, VV, EV> {
 	 * Groups by vertex and computes a GroupReduce transformation over the edge values of each vertex.
 	 * The edgesFunction applied on the edges only has access to the vertex id (not the vertex value)
 	 * of the grouping vertex.
-	 * 
-	 * For each vertex, the edgesFunction can iterate over all edges of this vertex
+	 *
+	 * <p>For each vertex, the edgesFunction can iterate over all edges of this vertex
 	 * with the specified direction, and emit any number of output elements, including none.
-	 * 
+	 *
 	 * @param edgesFunction the group reduce function to apply to the neighboring edges of each vertex.
 	 * @param direction the edge direction (in-, out-, all-).
 	 * @param <T> the output type
@@ -1089,10 +1089,10 @@ public class Graph<K, VV, EV> {
 	 * Groups by vertex and computes a GroupReduce transformation over the edge values of each vertex.
 	 * The edgesFunction applied on the edges only has access to the vertex id (not the vertex value)
 	 * of the grouping vertex.
-	 * 
-	 * For each vertex, the edgesFunction can iterate over all edges of this vertex
+	 *
+	 * <p>For each vertex, the edgesFunction can iterate over all edges of this vertex
 	 * with the specified direction, and emit any number of output elements, including none.
-	 * 
+	 *
 	 * @param edgesFunction the group reduce function to apply to the neighboring edges of each vertex.
 	 * @param direction the edge direction (in-, out-, all-).
 	 * @param <T> the output type
@@ -1207,12 +1207,13 @@ public class Graph<K, VV, EV> {
 
 			Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
 
-			if(vertexIterator.hasNext()) {
+			if (vertexIterator.hasNext()) {
 				function.iterateEdges(vertexIterator.next(), edges, out);
 			} else {
 				throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
 			}
 		}
+
 		@Override
 		public TypeInformation<T> getProducedType() {
 			return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(), 3,
@@ -1260,7 +1261,7 @@ public class Graph<K, VV, EV> {
 
 			Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
 
-			if(vertexIterator.hasNext()) {
+			if (vertexIterator.hasNext()) {
 				function.iterateEdges(vertexIterator.next(), edgesIterable, out);
 			} else {
 				throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
@@ -1299,8 +1300,8 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
-	 * Reverse the direction of the edges in the graph
-	 * 
+	 * Reverse the direction of the edges in the graph.
+	 *
 	 * @return a new graph with all edges reversed
 	 * @throws UnsupportedOperationException
 	 */
@@ -1357,7 +1358,7 @@ public class Graph<K, VV, EV> {
 	/**
 	 * Adds the input vertex to the graph. If the vertex already
 	 * exists in the graph, it will not be added again.
-	 * 
+	 *
 	 * @param vertex the vertex to be added
 	 * @return the new graph containing the existing vertices as well as the one just added
 	 */
@@ -1404,7 +1405,7 @@ public class Graph<K, VV, EV> {
 	/**
 	 * Adds the given edge to the graph. If the source and target vertices do
 	 * not exist in the graph, they will also be added.
-	 * 
+	 *
 	 * @param source the source vertex of the edge
 	 * @param target the target vertex of the edge
 	 * @param edgeValue the edge value
@@ -1421,7 +1422,7 @@ public class Graph<K, VV, EV> {
 	/**
 	 * Adds the given list edges to the graph.
 	 *
-	 * When adding an edge for a non-existing set of vertices, the edge is considered invalid and ignored.
+	 * <p>When adding an edge for a non-existing set of vertices, the edge is considered invalid and ignored.
 	 *
 	 * @param newEdges the data set of edges to be added
 	 * @return a new graph containing the existing edges plus the newly added edges.
@@ -1461,7 +1462,7 @@ public class Graph<K, VV, EV> {
 
 	/**
 	 * Removes the given vertex and its edges from the graph.
-	 * 
+	 *
 	 * @param vertex the vertex to remove
 	 * @return the new graph containing the existing vertices and edges without
 	 *         the removed vertex and its edges
@@ -1481,8 +1482,7 @@ public class Graph<K, VV, EV> {
 	 * 		   and edges removed.
 	 */
 
-	public Graph<K, VV, EV> removeVertices(List<Vertex<K, VV>> verticesToBeRemoved)
-	{
+	public Graph<K, VV, EV> removeVertices(List<Vertex<K, VV>> verticesToBeRemoved) {
 		return removeVertices(this.context.fromCollection(verticesToBeRemoved));
 	}
 
@@ -1537,7 +1537,7 @@ public class Graph<K, VV, EV> {
 
 	 /**
 	 * Removes all edges that match the given edge from the graph.
-	 * 
+	 *
 	 * @param edge the edge to remove
 	 * @return the new graph containing the existing vertices and edges without
 	 *         the removed edges
@@ -1592,7 +1592,7 @@ public class Graph<K, VV, EV> {
 	/**
 	 * Performs union on the vertices and edges sets of the input graphs
 	 * removing duplicate vertices but maintaining duplicate edges.
-	 * 
+	 *
 	 * @param graph the graph to perform union with
 	 * @return a new graph
 	 */
@@ -1616,7 +1616,7 @@ public class Graph<K, VV, EV> {
 	 * Performs Difference on the vertex and edge sets of the input graphs
 	 * removes common vertices and edges. If a source/target vertex is removed,
 	 * its corresponding edge will also be removed
-	 * 
+	 *
 	 * @param graph the graph to perform difference with
 	 * @return a new graph where the common vertices and edges have been removed
 	 */
@@ -1628,15 +1628,15 @@ public class Graph<K, VV, EV> {
 	/**
 	 * Performs intersect on the edge sets of the input graphs. Edges are considered equal, if they
 	 * have the same source identifier, target identifier and edge value.
-	 * <p>
-	 * The method computes pairs of equal edges from the input graphs. If the same edge occurs
+	 *
+	 * <p>The method computes pairs of equal edges from the input graphs. If the same edge occurs
 	 * multiple times in the input graphs, there will be multiple edge pairs to be considered. Each
 	 * edge instance can only be part of one pair. If the given parameter {@code distinctEdges} is set
 	 * to {@code true}, there will be exactly one edge in the output graph representing all pairs of
 	 * equal edges. If the parameter is set to {@code false}, both edges of each pair will be in the
 	 * output.
-	 * <p>
-	 * Vertices in the output graph will have no vertex values.
+	 *
+	 * <p>Vertices in the output graph will have no vertex values.
 	 *
 	 * @param graph the graph to perform intersect with
 	 * @param distinctEdges if set to {@code true}, there will be exactly one edge in the output graph
@@ -1709,7 +1709,7 @@ public class Graph<K, VV, EV> {
 			Iterator<Edge<K, EV>> rightIt = edgesRight.iterator();
 
 			// collect pairs once
-			while(leftIt.hasNext() && rightIt.hasNext()) {
+			while (leftIt.hasNext() && rightIt.hasNext()) {
 				out.collect(leftIt.next());
 				out.collect(rightIt.next());
 			}
@@ -1723,7 +1723,7 @@ public class Graph<K, VV, EV> {
 	 * @param scatterFunction the scatter function
 	 * @param gatherFunction the gather function
 	 * @param maximumNumberOfIterations maximum number of iterations to perform
-	 * 
+	 *
 	 * @return the updated Graph after the scatter-gather iteration has converged or
 	 * after maximumNumberOfIterations.
 	 */
@@ -1743,7 +1743,7 @@ public class Graph<K, VV, EV> {
 	 * @param gatherFunction the gather function
 	 * @param maximumNumberOfIterations maximum number of iterations to perform
 	 * @param parameters the iteration configuration parameters
-	 * 
+	 *
 	 * @return the updated Graph after the scatter-gather iteration has converged or
 	 * after maximumNumberOfIterations.
 	 */
@@ -1818,12 +1818,12 @@ public class Graph<K, VV, EV> {
 	 * @param computeFunction the vertex compute function
 	 * @param combiner an optional message combiner
 	 * @param maximumNumberOfIterations maximum number of iterations to perform
-	 * 
+	 *
 	 * @return the updated Graph after the vertex-centric iteration has converged or
 	 * after maximumNumberOfIterations.
 	 */
 	public <M> Graph<K, VV, EV> runVertexCentricIteration(
-			ComputeFunction<K, VV, EV, M> computeFunction, 
+			ComputeFunction<K, VV, EV, M> computeFunction,
 			MessageCombiner<K, M> combiner, int maximumNumberOfIterations) {
 
 		return this.runVertexCentricIteration(computeFunction, combiner, maximumNumberOfIterations, null);
@@ -1831,12 +1831,12 @@ public class Graph<K, VV, EV> {
 
 	/**
 	 * Runs a {@link VertexCentricIteration} on the graph with configuration options.
-	 * 
+	 *
 	 * @param computeFunction the vertex compute function
 	 * @param combiner an optional message combiner
 	 * @param maximumNumberOfIterations maximum number of iterations to perform
 	 * @param parameters the {@link VertexCentricConfiguration} parameters
-	 * 
+	 *
 	 * @return the updated Graph after the vertex-centric iteration has converged or
 	 * after maximumNumberOfIterations.
 	 */
@@ -1881,10 +1881,10 @@ public class Graph<K, VV, EV> {
 	 * Groups by vertex and computes a GroupReduce transformation over the neighbors (both edges and vertices)
 	 * of each vertex. The neighborsFunction applied on the neighbors only has access to both the vertex id
 	 * and the vertex value of the grouping vertex.
-	 * 
-	 * For each vertex, the neighborsFunction can iterate over all neighbors of this vertex
+	 *
+	 * <p>For each vertex, the neighborsFunction can iterate over all neighbors of this vertex
 	 * with the specified direction, and emit any number of output elements, including none.
-	 * 
+	 *
 	 * @param neighborsFunction the group reduce function to apply to the neighboring edges and vertices
 	 * of each vertex.
 	 * @param direction the edge direction (in-, out-, all-).
@@ -1928,10 +1928,10 @@ public class Graph<K, VV, EV> {
 	 * Groups by vertex and computes a GroupReduce transformation over the neighbors (both edges and vertices)
 	 * of each vertex. The neighborsFunction applied on the neighbors only has access to both the vertex id
 	 * and the vertex value of the grouping vertex.
-	 * 
-	 * For each vertex, the neighborsFunction can iterate over all neighbors of this vertex
+	 *
+	 * <p>For each vertex, the neighborsFunction can iterate over all neighbors of this vertex
 	 * with the specified direction, and emit any number of output elements, including none.
-	 * 
+	 *
 	 * @param neighborsFunction the group reduce function to apply to the neighboring edges and vertices
 	 * of each vertex.
 	 * @param direction the edge direction (in-, out-, all-).
@@ -1979,10 +1979,10 @@ public class Graph<K, VV, EV> {
 	 * Groups by vertex and computes a GroupReduce transformation over the neighbors (both edges and vertices)
 	 * of each vertex. The neighborsFunction applied on the neighbors only has access to the vertex id
 	 * (not the vertex value) of the grouping vertex.
-	 * 
-	 * For each vertex, the neighborsFunction can iterate over all neighbors of this vertex
+	 *
+	 * <p>For each vertex, the neighborsFunction can iterate over all neighbors of this vertex
 	 * with the specified direction, and emit any number of output elements, including none.
-	 * 
+	 *
 	 * @param neighborsFunction the group reduce function to apply to the neighboring edges and vertices
 	 * of each vertex.
 	 * @param direction the edge direction (in-, out-, all-).
@@ -2027,10 +2027,10 @@ public class Graph<K, VV, EV> {
 	 * Groups by vertex and computes a GroupReduce transformation over the neighbors (both edges and vertices)
 	 * of each vertex. The neighborsFunction applied on the neighbors only has access to the vertex id
 	 * (not the vertex value) of the grouping vertex.
-	 * 
-	 * For each vertex, the neighborsFunction can iterate over all neighbors of this vertex
+	 *
+	 * <p>For each vertex, the neighborsFunction can iterate over all neighbors of this vertex
 	 * with the specified direction, and emit any number of output elements, including none.
-	 * 
+	 *
 	 * @param neighborsFunction the group reduce function to apply to the neighboring edges and vertices
 	 * of each vertex.
 	 * @param direction the edge direction (in-, out-, all-).
@@ -2105,7 +2105,7 @@ public class Graph<K, VV, EV> {
 		}
 
 		@SuppressWarnings("unchecked")
-		public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex, 
+		public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex,
 				Collector<Tuple2<K, VV>> out) {
 			out.collect(new Tuple2<>((K) edge.getField(fieldPosition), otherVertex.getValue()));
 		}
@@ -2180,7 +2180,7 @@ public class Graph<K, VV, EV> {
 		}
 
 		public void coGroup(Iterable<Vertex<K, VV>> vertex,
-				final Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithNeighbors, 
+				final Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithNeighbors,
 				Collector<T> out) throws Exception {
 
 			final Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighborsIterator = new Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>>() {

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
index e72b853..46875b6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
@@ -35,7 +35,7 @@ import org.apache.flink.api.java.DataSet;
 public interface GraphAnalytic<K, VV, EV, T> {
 
 	/**
-	 * This method must be called after the program has executed:
+	 * This method must be called after the program has executed.
 	 *  1) "run" analytics and algorithms
 	 *  2) call ExecutionEnvironment.execute()
 	 *  3) get analytic results

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalyticBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalyticBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalyticBase.java
new file mode 100644
index 0000000..2e7c5b2
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalyticBase.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class for {@link GraphAnalytic}.
+ *
+ * @param <K> key type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ * @param <T> the return type
+ */
+public abstract class GraphAnalyticBase<K, VV, EV, T>
+implements GraphAnalytic<K, VV, EV, T> {
+
+	protected ExecutionEnvironment env;
+
+	@Override
+	public GraphAnalytic<K, VV, EV, T> run(Graph<K, VV, EV> input)
+			throws Exception {
+		env = input.getContext();
+		return this;
+	}
+
+	@Override
+	public T execute()
+			throws Exception {
+		env.execute();
+		return getResult();
+	}
+
+	@Override
+	public T execute(String jobName)
+			throws Exception {
+		Preconditions.checkNotNull(jobName);
+
+		env.execute(jobName);
+		return getResult();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
index 6547f9a..6f5570f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphCsvReader.java
@@ -85,7 +85,6 @@ public class GraphCsvReader {
 				new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), context);
 	}
 
-
 	public <K, VV> GraphCsvReader(String edgePath, final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
 			this(new Path(Preconditions.checkNotNull(edgePath, "The file path may not be null.")), mapper, context);
 	}
@@ -93,7 +92,7 @@ public class GraphCsvReader {
 	/**
 	 * Creates a Graph from CSV input with vertex values and edge values.
 	 * The vertex values are specified through a vertices input file or a user-defined map function.
-	 * 
+	 *
 	 * @param vertexKey the type of the vertex IDs
 	 * @param vertexValue the type of the vertex values
 	 * @param edgeValue the type of the edge values
@@ -226,7 +225,7 @@ public class GraphCsvReader {
 	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader lineDelimiterVertices(String delimiter) {
-		if(this.vertexReader != null) {
+		if (this.vertexReader != null) {
 			this.vertexReader.lineDelimiter(delimiter);
 		}
 		return this;
@@ -240,7 +239,7 @@ public class GraphCsvReader {
 	 * @return The GraphCsv reader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader fieldDelimiterVertices(String delimiter) {
-		if(this.vertexReader != null) {
+		if (this.vertexReader != null) {
 			this.vertexReader.fieldDelimiter(delimiter);
 		}
 		return this;
@@ -280,7 +279,7 @@ public class GraphCsvReader {
 	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader parseQuotedStringsVertices(char quoteCharacter) {
-		if(this.vertexReader != null) {
+		if (this.vertexReader != null) {
 			this.vertexReader.parseQuotedStrings(quoteCharacter);
 		}
 		return this;
@@ -295,7 +294,7 @@ public class GraphCsvReader {
 	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader ignoreCommentsVertices(String commentPrefix) {
-		if(this.vertexReader != null) {
+		if (this.vertexReader != null) {
 			this.vertexReader.ignoreComments(commentPrefix);
 		}
 		return this;
@@ -327,7 +326,7 @@ public class GraphCsvReader {
 	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader includeFieldsVertices(boolean ... vertexFields) {
-		if(this.vertexReader != null) {
+		if (this.vertexReader != null) {
 			this.vertexReader.includeFields(vertexFields);
 		}
 		return this;
@@ -364,7 +363,7 @@ public class GraphCsvReader {
 	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader includeFieldsVertices(String mask) {
-		if(this.vertexReader != null) {
+		if (this.vertexReader != null) {
 			this.vertexReader.includeFields(mask);
 		}
 		return this;
@@ -396,8 +395,8 @@ public class GraphCsvReader {
 	 * non-zero bit.
 	 * The parser will skip over all fields where the character at the corresponding bit is zero, and
 	 * include the fields where the corresponding bit is one.
-	 * <p>
-	 * Examples:
+	 *
+	 * <p>Examples:
 	 * <ul>
 	 *   <li>A mask of {@code 0x7} would include the first three fields.</li>
 	 *   <li>A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields
@@ -408,7 +407,7 @@ public class GraphCsvReader {
 	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader includeFieldsVertices(long mask) {
-		if(this.vertexReader != null) {
+		if (this.vertexReader != null) {
 			this.vertexReader.includeFields(mask);
 		}
 		return this;
@@ -422,8 +421,8 @@ public class GraphCsvReader {
 	 * non-zero bit.
 	 * The parser will skip over all fields where the character at the corresponding bit is zero, and
 	 * include the fields where the corresponding bit is one.
-	 * <p>
-	 * Examples:
+	 *
+	 * <p>Examples:
 	 * <ul>
 	 *   <li>A mask of {@code 0x7} would include the first three fields.</li>
 	 *   <li>A mask of {@code 0x26} (binary {@code 100110} would skip the first fields, include fields
@@ -454,7 +453,7 @@ public class GraphCsvReader {
 	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader ignoreFirstLineVertices() {
-		if(this.vertexReader != null) {
+		if (this.vertexReader != null) {
 			this.vertexReader.ignoreFirstLine();
 		}
 		return this;
@@ -478,7 +477,7 @@ public class GraphCsvReader {
 	 * @return The GraphCSVReader instance itself, to allow for fluent function chaining.
 	 */
 	public GraphCsvReader ignoreInvalidLinesVertices() {
-		if(this.vertexReader != null) {
+		if (this.vertexReader != null) {
 			this.vertexReader.ignoreInvalidLines();
 		}
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
index 964d20e..85bc9ef 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
@@ -18,38 +18,38 @@
 
 package org.apache.flink.graph;
 
-import java.util.HashMap;
-import java.util.Map;
-
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.graph.spargel.GatherFunction;
 import org.apache.flink.util.Preconditions;
 
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * This is used as a base class for vertex-centric iteration or gather-sum-apply iteration configuration.
  */
 public abstract class IterationConfiguration {
 
-	/** the iteration name **/
+	// the iteration name
 	private String name;
 
-	/** the iteration parallelism **/
+	// the iteration parallelism
 	private int parallelism = -1;
 
-	/** the iteration aggregators **/
+	// the iteration aggregators
 	private Map<String, Aggregator<?>> aggregators = new HashMap<>();
 
-	/** flag that defines whether the solution set is kept in managed memory **/
+	// flag that defines whether the solution set is kept in managed memory
 	private boolean unmanagedSolutionSet = false;
 
-	/** flag that defines whether the number of vertices option is set **/
+	// flag that defines whether the number of vertices option is set
 	private boolean optNumVertices = false;
-	
+
 	public IterationConfiguration() {}
 
 	/**
 	 * Sets the name for the iteration. The name is displayed in logs and messages.
-	 * 
+	 *
 	 * @param name The name for the iteration.
 	 */
 	public void setName(String name) {
@@ -58,13 +58,13 @@ public abstract class IterationConfiguration {
 
 	/**
 	 * Gets the name of the iteration.
-	 * @param defaultName 
-	 * 
+	 * @param defaultName
+	 *
 	 * @return The name of the iteration.
 	 */
 	public String getName(String defaultName) {
 		if (name != null) {
-			return name;			
+			return name;
 		}
 		else {
 			return defaultName;
@@ -73,17 +73,17 @@ public abstract class IterationConfiguration {
 
 	/**
 	 * Sets the parallelism for the iteration.
-	 * 
+	 *
 	 * @param parallelism The parallelism.
 	 */
 	public void setParallelism(int parallelism) {
 		Preconditions.checkArgument(parallelism > 0 || parallelism == -1, "The parallelism must be positive, or -1 (use default).");
 		this.parallelism = parallelism;
 	}
-	
+
 	/**
 	 * Gets the iteration's parallelism.
-	 * 
+	 *
 	 * @return The iterations parallelism, or -1, if not set.
 	 */
 	public int getParallelism() {
@@ -94,18 +94,18 @@ public abstract class IterationConfiguration {
 	 * Defines whether the solution set is kept in managed memory (Flink's internal way of keeping object
 	 * in serialized form) or as a simple object map.
 	 * By default, the solution set runs in managed memory.
-	 * 
+	 *
 	 * @param unmanaged True, to keep the solution set in unmanaged memory, false otherwise.
 	 */
 	public void setSolutionSetUnmanagedMemory(boolean unmanaged) {
 		this.unmanagedSolutionSet = unmanaged;
 	}
-	
+
 	/**
 	 * Gets whether the solution set is kept in managed memory (Flink's internal way of keeping object
 	 * in serialized form) or as a simple object map.
 	 * By default, the solution set runs in managed memory.
-	 * 
+	 *
 	 * @return True, if the solution set is in unmanaged memory, false otherwise.
 	 */
 	public boolean isSolutionSetUnmanagedMemory() {
@@ -136,8 +136,8 @@ public abstract class IterationConfiguration {
 	 * Registers a new aggregator. Aggregators registered here are available during the execution of the vertex updates
 	 * via {@link GatherFunction#getIterationAggregator(String)} and
 	 * {@link GatherFunction#getPreviousIterationAggregate(String)}.
-	 * 
-	 * @param name The name of the aggregator, used to retrieve it and its aggregates during execution. 
+	 *
+	 * @param name The name of the aggregator, used to retrieve it and its aggregates during execution.
 	 * @param aggregator The aggregator.
 	 */
 	public void registerAggregator(String name, Aggregator<?> aggregator) {

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
index 1a32204..01ea9d6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.graph;
 
-import java.io.Serializable;
-
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.util.Collector;
 
+import java.io.Serializable;
+
 /**
  * Interface to be implemented by the function applied to a vertex neighborhood
  * in the {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)}
@@ -39,16 +39,16 @@ public interface NeighborsFunction<K, VV, EV, O> extends Function, Serializable
 	/**
 	 * This method is called per vertex and can iterate over all of its neighbors
 	 * with the specified direction.
-	 * <p>
-	 * If called with {@link EdgeDirection#OUT} the group will contain
+	 *
+	 * <p>If called with {@link EdgeDirection#OUT} the group will contain
 	 * the out-edges and neighboring vertices of the grouping vertex.
 	 * If called with {@link EdgeDirection#IN} the group will contain
 	 * the in-edges and neighboring vertices of the grouping vertex.
 	 * If called with {@link EdgeDirection#ALL} the group will contain
 	 * all edges and neighboring vertices of the grouping vertex.
-	 * <p>
-	 * The method can emit any number of output elements, including none.
-	 * 
+	 *
+	 * <p>The method can emit any number of output elements, including none.
+	 *
 	 * @param neighbors the neighbors of the grouping vertex.
 	 * The first filed of each Tuple3 is the ID of the grouping vertex.
 	 * The second field is the neighboring edge, and the third field is the neighboring vertex.

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
index 657238c..41aedb0 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
@@ -18,12 +18,12 @@
 
 package org.apache.flink.graph;
 
-import java.io.Serializable;
-
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.util.Collector;
 
+import java.io.Serializable;
+
 /**
  * Interface to be implemented by the function applied to a vertex neighborhood
  * in the {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)}
@@ -39,16 +39,16 @@ public interface NeighborsFunctionWithVertexValue<K, VV, EV, O> extends Function
 	/**
 	 * This method is called per vertex and can iterate over all of its neighbors
 	 * with the specified direction.
-	 * <p>
-	 * If called with {@link EdgeDirection#OUT} the group will contain
+	 *
+	 * <p>If called with {@link EdgeDirection#OUT} the group will contain
 	 * the out-edges and neighboring vertices of the grouping vertex.
 	 * If called with {@link EdgeDirection#IN} the group will contain
 	 * the in-edges and neighboring vertices of the grouping vertex.
 	 * If called with {@link EdgeDirection#ALL} the group will contain
 	 * all edges and neighboring vertices of the grouping vertex.
-	 * <p>
-	 * The method can emit any number of output elements, including none.
-	 * 
+	 *
+	 * <p>The method can emit any number of output elements, including none.
+	 *
 	 * @param vertex the grouping Vertex
 	 * @param neighbors the neighbors of the grouping vertex.
 	 * The first filed of each Tuple3 is the ID of the grouping vertex.

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
index e7631a1..0cc6e72 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
@@ -34,7 +34,7 @@ public interface ReduceEdgesFunction<EV> extends Function, Serializable {
 	 * It combines two neighboring edge values into one new value of the same type.
 	 * For each vertex, this function is consecutively called,
 	 * until only a single value for each edge remains.
-	 * 
+	 *
 	 * @param firstEdgeValue the first neighboring edge value to combine
 	 * @param secondEdgeValue the second neighboring edge value to combine
 	 * @return the combined value of both input values

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
index 5b423e2..cb4ee60 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
@@ -35,7 +35,7 @@ public interface ReduceNeighborsFunction <VV> extends Function, Serializable {
 	 * It combines two neighboring vertex values into one new value of the same type.
 	 * For each vertex, this function is consecutively called,
 	 * until only a single value for each vertex remains.
-	 * 
+	 *
 	 * @param firstNeighborValue the first neighboring vertex value to combine
 	 * @param secondNeighborValue the second neighboring vertex value to combine
 	 * @return the combined value of both input values

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
index 2ae0903..77a577f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Triplet.java
@@ -35,7 +35,7 @@ public class Triplet <K, VV, EV> extends Tuple5<K, K, VV, VV, EV> {
 	public Triplet() {}
 
 	/**
-	 * Constructs a Triplet from a given source vertex, target vertex and edge
+	 * Constructs a Triplet from a given source vertex, target vertex, and edge.
 	 *
 	 * @param srcVertex
 	 * @param trgVertex

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
index f40dac9..aaa75f8 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/VertexJoinFunction.java
@@ -35,7 +35,7 @@ public interface VertexJoinFunction<VV, T> extends Function, Serializable {
 	/**
 	 * Applies a transformation on the current vertex value
 	 * and the value of the matched tuple of the input DataSet.
-	 * 
+	 *
 	 * @param vertexValue the current vertex value
 	 * @param inputValue the value of the matched Tuple2 input
 	 * @return the new vertex value

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/AbstractDataSetAnalytic.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/AbstractDataSetAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/AbstractDataSetAnalytic.java
deleted file mode 100644
index 46007ca..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/AbstractDataSetAnalytic.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.asm.dataset;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.util.Preconditions;
-
-/**
- * Base class for {@link DataSetAnalytic}.
- *
- * @param <T> element type
- * @param <R> the return type
- */
-public abstract class AbstractDataSetAnalytic<T, R>
-implements DataSetAnalytic<T, R> {
-
-	protected ExecutionEnvironment env;
-
-	@Override
-	public AbstractDataSetAnalytic<T, R> run(DataSet<T> input)
-			throws Exception {
-		env = input.getExecutionEnvironment();
-		return this;
-	}
-
-	@Override
-	public R execute()
-			throws Exception {
-		env.execute();
-		return getResult();
-	}
-
-	@Override
-	public R execute(String jobName)
-			throws Exception {
-		Preconditions.checkNotNull(jobName);
-
-		env.execute(jobName);
-		return getResult();
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java
index 1f8fe99..35a8876 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/ChecksumHashCode.java
@@ -18,14 +18,15 @@
 
 package org.apache.flink.graph.asm.dataset;
 
-import org.apache.commons.lang3.builder.EqualsBuilder;
-import org.apache.commons.lang3.builder.HashCodeBuilder;
 import org.apache.flink.api.common.accumulators.Accumulator;
 import org.apache.flink.api.common.accumulators.SimpleAccumulator;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.AnalyticHelper;
 import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
 import java.io.IOException;
 
 /**
@@ -35,7 +36,7 @@ import java.io.IOException;
  * @param <T> element type
  */
 public class ChecksumHashCode<T>
-extends AbstractDataSetAnalytic<T, Checksum> {
+extends DataSetAnalyticBase<T, Checksum> {
 
 	private static final String CHECKSUM = "checksum";
 
@@ -136,11 +137,19 @@ extends AbstractDataSetAnalytic<T, Checksum> {
 
 		@Override
 		public boolean equals(Object obj) {
-			if (obj == null) { return false; }
-			if (obj == this) { return true; }
-			if (obj.getClass() != getClass()) { return false; }
+			if (obj == null) {
+				return false;
+			}
+
+			if (obj == this) {
+				return true;
+			}
+
+			if (obj.getClass() != getClass()) {
+				return false;
+			}
 
-			Checksum rhs = (Checksum)obj;
+			Checksum rhs = (Checksum) obj;
 
 			return new EqualsBuilder()
 				.append(count, rhs.count)

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
index 771a044..ad2886f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Collect.java
@@ -34,7 +34,7 @@ import java.util.List;
  * @param <T> element type
  */
 public class Collect<T>
-extends AbstractDataSetAnalytic<T, List<T>> {
+extends DataSetAnalyticBase<T, List<T>> {
 
 	private static final String COLLECT = "collect";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Count.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Count.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Count.java
index 7bc97d5..34ef979 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Count.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/Count.java
@@ -30,7 +30,7 @@ import java.io.IOException;
  * @param <T> element type
  */
 public class Count<T>
-extends AbstractDataSetAnalytic<T, Long> {
+extends DataSetAnalyticBase<T, Long> {
 
 	private static final String COUNT = "count";
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java
index abf4039..9c5c448 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalytic.java
@@ -35,7 +35,7 @@ import org.apache.flink.api.java.operators.CustomUnaryOperation;
 public interface DataSetAnalytic<T, R> {
 
 	/**
-	 * This method must be called after the program has executed:
+	 * This method must be called after the program has executed.
 	 *  1) "run" analytics and algorithms
 	 *  2) call ExecutionEnvironment.execute()
 	 *  3) get analytic results

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalyticBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalyticBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalyticBase.java
new file mode 100644
index 0000000..a1df14c
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/dataset/DataSetAnalyticBase.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.dataset;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class for {@link DataSetAnalytic}.
+ *
+ * @param <T> element type
+ * @param <R> the return type
+ */
+public abstract class DataSetAnalyticBase<T, R>
+implements DataSetAnalytic<T, R> {
+
+	protected ExecutionEnvironment env;
+
+	@Override
+	public DataSetAnalyticBase<T, R> run(DataSet<T> input)
+			throws Exception {
+		env = input.getExecutionEnvironment();
+		return this;
+	}
+
+	@Override
+	public R execute()
+			throws Exception {
+		env.execute();
+		return getResult();
+	}
+
+	@Override
+	public R execute(String jobName)
+			throws Exception {
+		Preconditions.checkNotNull(jobName);
+
+		env.execute(jobName);
+		return getResult();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java
index b91b4cb..a4791e9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java
@@ -30,8 +30,13 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.LongValue;
 
+/**
+ * Common user-defined-functions.
+ */
 public class DegreeAnnotationFunctions {
 
+	private DegreeAnnotationFunctions() {}
+
 	// --------------------------------------------------------------------------------------------
 	//  Vertex functions
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
index 6f808f3..27c829b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
@@ -67,7 +67,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, Deg
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
 		Preconditions.checkNotNull(other);
 
-		if (! EdgeDegreesPair.class.isAssignableFrom(other.getClass())) {
+		if (!EdgeDegreesPair.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
index 03fd1ba..3c4e611 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
@@ -66,7 +66,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
 		Preconditions.checkNotNull(other);
 
-		if (! EdgeSourceDegrees.class.isAssignableFrom(other.getClass())) {
+		if (!EdgeSourceDegrees.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
index 7526d00..94788e2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
@@ -66,7 +66,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
 		Preconditions.checkNotNull(other);
 
-		if (! EdgeTargetDegrees.class.isAssignableFrom(other.getClass())) {
+		if (!EdgeTargetDegrees.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index f73d37b..0333b8b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -32,7 +32,7 @@ import org.apache.flink.graph.EdgeOrder;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
-import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.graph.utils.MurmurHash;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.ByteValue;
@@ -93,7 +93,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> {
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
 		Preconditions.checkNotNull(other);
 
-		if (! VertexDegrees.class.isAssignableFrom(other.getClass())) {
+		if (!VertexDegrees.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 
@@ -271,7 +271,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> {
 	extends Tuple3<LongValue, LongValue, LongValue> {
 		private static final int HASH_SEED = 0x3a12fc31;
 
-		private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
+		private MurmurHash hasher = new MurmurHash(HASH_SEED);
 
 		public Degrees() {
 			this(new LongValue(), new LongValue(), new LongValue());

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index 5fdd8f9..f316b9b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -86,7 +86,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
 		Preconditions.checkNotNull(other);
 
-		if (! VertexInDegree.class.isAssignableFrom(other.getClass())) {
+		if (!VertexInDegree.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index 8e3e9c6..b04391d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -86,7 +86,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
 		Preconditions.checkNotNull(other);
 
-		if (! VertexOutDegree.class.isAssignableFrom(other.getClass())) {
+		if (!VertexOutDegree.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
index 71b4891..c6d0646 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
@@ -88,7 +88,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, L
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
 		Preconditions.checkNotNull(other);
 
-		if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
+		if (!EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
index ee9a144..88bab5a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
@@ -86,7 +86,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>>
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
 		Preconditions.checkNotNull(other);
 
-		if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
+		if (!EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
index 1255d86..21918c7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
@@ -86,7 +86,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>>
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
 		Preconditions.checkNotNull(other);
 
-		if (! EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
+		if (!EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index b731548..9ea99a7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -21,15 +21,15 @@ package org.apache.flink.graph.asm.degree.annotate.undirected;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
 import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.DegreeCount;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToTargetId;
+import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
+import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Preconditions;
 
@@ -106,7 +106,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingDataSet other) {
 		Preconditions.checkNotNull(other);
 
-		if (! VertexDegree.class.isAssignableFrom(other.getClass())) {
+		if (!VertexDegree.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
index e5eea61..ae1e5b6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
@@ -123,7 +123,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
 		Preconditions.checkNotNull(other);
 
-		if (! MaximumDegree.class.isAssignableFrom(other.getClass())) {
+		if (!MaximumDegree.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
index 15c8359..d978096 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -64,7 +64,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
 		Preconditions.checkNotNull(other);
 
-		if (! Simplify.class.isAssignableFrom(other.getClass())) {
+		if (!Simplify.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
index a3c007e..617dce1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -49,7 +49,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 	 * Simplifies an undirected graph by adding reverse edges and removing
 	 * self-loops and duplicate edges.
 	 *
-	 * When clip-and-flip is set, edges where source < target are removed
+	 * <p>When clip-and-flip is set, edges where source < target are removed
 	 * before symmetrizing the graph.
 	 *
 	 * @param clipAndFlip method for generating simple graph
@@ -82,7 +82,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
 		Preconditions.checkNotNull(other);
 
-		if (! Simplify.class.isAssignableFrom(other.getClass())) {
+		if (!Simplify.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
index f7cc601..9c4f88e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
@@ -71,7 +71,7 @@ public class Translate {
 		Preconditions.checkNotNull(vertices);
 		Preconditions.checkNotNull(translator);
 
-		Class<Vertex<NEW, VV>> vertexClass = (Class<Vertex<NEW, VV>>)(Class<? extends Vertex>) Vertex.class;
+		Class<Vertex<NEW, VV>> vertexClass = (Class<Vertex<NEW, VV>>) (Class<? extends Vertex>) Vertex.class;
 		TypeInformation<OLD> oldType = ((TupleTypeInfo<Vertex<OLD, VV>>) vertices.getType()).getTypeAt(0);
 		TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false);
 		TypeInformation<VV> vertexValueType = ((TupleTypeInfo<Vertex<OLD, VV>>) vertices.getType()).getTypeAt(1);
@@ -146,7 +146,7 @@ public class Translate {
 		Preconditions.checkNotNull(edges);
 		Preconditions.checkNotNull(translator);
 
-		Class<Edge<NEW, EV>> edgeClass = (Class<Edge<NEW, EV>>)(Class<? extends Edge>) Edge.class;
+		Class<Edge<NEW, EV>> edgeClass = (Class<Edge<NEW, EV>>) (Class<? extends Edge>) Edge.class;
 		TypeInformation<OLD> oldType = ((TupleTypeInfo<Edge<OLD, EV>>) edges.getType()).getTypeAt(0);
 		TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false);
 		TypeInformation<EV> edgeValueType = ((TupleTypeInfo<Edge<OLD, EV>>) edges.getType()).getTypeAt(2);
@@ -222,7 +222,7 @@ public class Translate {
 		Preconditions.checkNotNull(vertices);
 		Preconditions.checkNotNull(translator);
 
-		Class<Vertex<K, NEW>> vertexClass = (Class<Vertex<K, NEW>>)(Class<? extends Vertex>) Vertex.class;
+		Class<Vertex<K, NEW>> vertexClass = (Class<Vertex<K, NEW>>) (Class<? extends Vertex>) Vertex.class;
 		TypeInformation<K> idType = ((TupleTypeInfo<Vertex<K, OLD>>) vertices.getType()).getTypeAt(0);
 		TypeInformation<OLD> oldType = ((TupleTypeInfo<Vertex<K, OLD>>) vertices.getType()).getTypeAt(1);
 		TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false);
@@ -297,7 +297,7 @@ public class Translate {
 		Preconditions.checkNotNull(edges);
 		Preconditions.checkNotNull(translator);
 
-		Class<Edge<K, NEW>> edgeClass = (Class<Edge<K, NEW>>)(Class<? extends Edge>) Edge.class;
+		Class<Edge<K, NEW>> edgeClass = (Class<Edge<K, NEW>>) (Class<? extends Edge>) Edge.class;
 		TypeInformation<K> idType = ((TupleTypeInfo<Edge<K, OLD>>) edges.getType()).getTypeAt(0);
 		TypeInformation<OLD> oldType = ((TupleTypeInfo<Edge<K, OLD>>) edges.getType()).getTypeAt(2);
 		TypeInformation<NEW> newType = TypeExtractor.getUnaryOperatorReturnType(translator, TranslateFunction.class, false, false, oldType, null, false);

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
index c8000e4..3956a81 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
@@ -79,7 +79,7 @@ extends GraphAlgorithmWrappingGraph<K, VV, OLD, K, VV, NEW> {
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
 		Preconditions.checkNotNull(other);
 
-		if (! TranslateEdgeValues.class.isAssignableFrom(other.getClass())) {
+		if (!TranslateEdgeValues.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateFunction.java
index 7f495bf..fb1e3c1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateFunction.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateFunction.java
@@ -26,8 +26,8 @@ import java.io.Serializable;
  * Base interface for Translate functions. Translate functions take elements and transform them,
  * element wise. A Translate function always produces a single result element for each input element.
  * Typical applications are transcribing between data types or manipulating element values.
- * <p>
- * Translate functions are used within the Graph API and by translating GraphAlgorithms.
+ *
+ * <p>Translate functions are used within the Graph API and by translating GraphAlgorithms.
  *
  * @param <T> Type of the input elements.
  * @param <O> Type of the returned elements.

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
index 58cb7e2..d8c5676 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
@@ -30,7 +30,7 @@ import static org.apache.flink.graph.asm.translate.Translate.translateEdgeIds;
 import static org.apache.flink.graph.asm.translate.Translate.translateVertexIds;
 
 /**
- * Translate {@link Vertex} and {@link Edge} IDs of a {@link Graph} using the given {@link TranslateFunction}
+ * Translate {@link Vertex} and {@link Edge} IDs of a {@link Graph} using the given {@link TranslateFunction}.
  *
  * @param <OLD> old graph ID type
  * @param <NEW> new graph ID type
@@ -47,7 +47,7 @@ extends GraphAlgorithmWrappingGraph<OLD, VV, EV, NEW, VV, EV> {
 	private int parallelism = PARALLELISM_DEFAULT;
 
 	/**
-	 * Translate {@link Vertex} and {@link Edge} IDs of a {@link Graph} using the given {@link TranslateFunction}
+	 * Translate {@link Vertex} and {@link Edge} IDs of a {@link Graph} using the given {@link TranslateFunction}.
 	 *
 	 * @param translator implements conversion from {@code OLD} to {@code NEW}
 	 */
@@ -81,7 +81,7 @@ extends GraphAlgorithmWrappingGraph<OLD, VV, EV, NEW, VV, EV> {
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
 		Preconditions.checkNotNull(other);
 
-		if (! TranslateGraphIds.class.isAssignableFrom(other.getClass())) {
+		if (!TranslateGraphIds.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d313ac76/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
index 7447e11..452cb26 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
@@ -79,7 +79,7 @@ extends GraphAlgorithmWrappingGraph<K, OLD, EV, K, NEW, EV> {
 	protected boolean mergeConfiguration(GraphAlgorithmWrappingGraph other) {
 		Preconditions.checkNotNull(other);
 
-		if (! TranslateVertexValues.class.isAssignableFrom(other.getClass())) {
+		if (!TranslateVertexValues.class.isAssignableFrom(other.getClass())) {
 			return false;
 		}