You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/06/07 17:48:02 UTC

flink git commit: [FLINK-3906] [gelly] Global Clustering Coefficient

Repository: flink
Updated Branches:
  refs/heads/master 7160a6812 -> 8da1a75ce


[FLINK-3906] [gelly] Global Clustering Coefficient

The global clustering coefficient measures the connectedness of a graph.
Scores range from 0.0 (no triangles) to 1.0 (complete graph).

This closes #1997


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8da1a75c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8da1a75c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8da1a75c

Branch: refs/heads/master
Commit: 8da1a75ceb30ef1bce27a2426dab3a0f66b94b64
Parents: 7160a68
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue May 17 10:02:47 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jun 7 12:26:15 2016 -0400

----------------------------------------------------------------------
 docs/apis/batch/libs/gelly.md                   |  38 ++-
 .../graph/library/TriangleCountITCase.java      |  53 ----
 .../org/apache/flink/graph/scala/Graph.scala    |  18 +-
 .../flink/graph/AbstractGraphAnalytic.java      |  62 +++++
 .../main/java/org/apache/flink/graph/Graph.java |  32 ++-
 .../org/apache/flink/graph/GraphAnalytic.java   |  74 ++++++
 .../flink/graph/library/GSATriangleCount.java   | 192 --------------
 .../undirected/GlobalClusteringCoefficient.java | 155 ++++++++++++
 .../undirected/LocalClusteringCoefficient.java  |   2 +-
 .../clustering/undirected/TriangleCount.java    |  78 ++++++
 .../clustering/undirected/TriangleListing.java  |   4 +-
 .../metric/undirected/VertexMetrics.java        | 251 +++++++++++++++++++
 .../GlobalClusteringCoefficientTest.java        |  84 +++++++
 .../LocalClusteringCoefficientTest.java         |   8 +-
 .../undirected/TriangleCountTest.java           |  75 ++++++
 .../metric/undirected/VertexMetricsTest.java    |  97 +++++++
 16 files changed, 954 insertions(+), 269 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index 7adff04..45fdbe5 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -1828,12 +1828,13 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc
 * [GSA PageRank](#gsa-pagerank)
 * [Single Source Shortest Paths](#single-source-shortest-paths)
 * [GSA Single Source Shortest Paths](#gsa-single-source-shortest-paths)
-* [GSA Triangle Count](#gsa-triangle-count)
+* [Triangle Count](#triangle-count)
 * [Triangle Enumerator](#triangle-enumerator)
 * [Hyperlink-Induced Topic Search](#hyperlink-induced-topic-search)
 * [Summarization](#summarization)
 * [Jaccard Index](#jaccard-index)
 * [Local Clustering Coefficient](#local-clustering-coefficient)
+* [Global Clustering Coefficient](#global-clustering-coefficient)
 
 Gelly's library methods can be used by simply calling the `run()` method on the input graph:
 
@@ -1997,19 +1998,23 @@ The algorithm is implemented using [gather-sum-apply iterations](#gather-sum-app
 
 See the [Single Source Shortest Paths](#single-source-shortest-paths) library method for implementation details and usage information.
 
-### GSA Triangle Count
+### Triangle Count
 
 #### Overview
-An implementation of the Triangle Count algorithm. Given an input graph, it returns the number of unique triangles in it.
+An analytic for counting the number of unique triangles in a graph.
 
 #### Details
-This algorithm operates in three phases. First, vertices select neighbors with IDs greater than theirs
-and send messages to them. Each received message is then propagated to neighbors with higher IDs.
-Finally, if a node encounters the target ID in the list of received messages, it increments the number of discovered triangles.
+Counts the triangles generated by [Triangle Listing](#triangle-listing).
 
 #### Usage
-The algorithm takes an undirected, unweighted graph as input and outputs a `DataSet` which contains a single integer corresponding to the number of triangles
-in the graph. The algorithm constructor takes no arguments.
+The analytic takes an undirected graph as input and returns as a result a `Long` corresponding to the number of triangles
+in the graph. The graph ID type must be `Comparable` and `Copyable`.
+
+### Triangle Listing
+
+This algorithm supports object reuse. The graph ID type must be `Comparable` and `Copyable`.
+
+See the [Triangle Enumerator](#triangle-enumerator) library method for implementation details.
 
 ### Triangle Enumerator
 
@@ -2108,7 +2113,22 @@ See the [Triangle Enumeration](#triangle-enumeration) library method for a detai
 
 #### Usage
 The algorithm takes a simple, undirected graph as input and outputs a `DataSet` of tuples containing the vertex ID,
-vertex degree, and number of triangles containing the vertex. The vertex ID must be `Comparable` and `Copyable`.
+vertex degree, and number of triangles containing the vertex. The graph ID type must be `Comparable` and `Copyable`.
+
+### Global Clustering Coefficient
+
+#### Overview
+The global clustering coefficient measures the connectedness of a graph. Scores range from 0.0 (no edges between
+neighbors) to 1.0 (complete graph).
+
+#### Details
+See the [Local Clustering Coefficient](#local-clustering-coefficient) library method for a detailed explanation of
+clustering coefficient.
+
+#### Usage
+The algorithm takes a simple, undirected graph as input and outputs a result containing the total number of triplets and
+triangles in the graph. The graph ID type must be `Comparable` and `Copyable`.
+
 
 {% top %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java
deleted file mode 100644
index aaada8f..0000000
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.examples.data.TriangleCountData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class TriangleCountITCase extends MultipleProgramsTestBase {
-
-	public TriangleCountITCase(TestExecutionMode mode) {
-		super(mode);
-	}
-
-	@Test
-	public void testGSATriangleCount() throws Exception {
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
-				env).getUndirected();
-
-		List<Integer> numberOfTriangles = graph.run(new GSATriangleCount<Long, NullValue, NullValue>()).collect();
-		String expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES;
-
-		Assert.assertEquals(numberOfTriangles.get(0).intValue(), Integer.parseInt(expectedResult));
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index f31619d..3881aae 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -1103,19 +1103,35 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    * @return a Dataset of Tuple2, with one tuple per vertex.
    * The first field of the Tuple2 is the vertex ID and the second field
    * is the aggregate value computed by the provided [[ReduceNeighborsFunction]].
-  */
+   */
   def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection):
   DataSet[(K, EV)] = {
     wrap(jgraph.reduceOnEdges(reduceEdgesFunction, direction)).map(jtuple => (jtuple.f0,
       jtuple.f1))
   }
 
+  /**
+   * @param algorithm the algorithm to run on the Graph
+   * @return the result of the graph algorithm
+   */
   def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]):
   T = {
     jgraph.run(algorithm)
   }
 
   /**
+   * A GraphAnalytic is similar to a GraphAlgorithm but is terminal and results
+   * are retrieved via accumulators.  A Flink program has a single point of
+   * execution. A GraphAnalytic defers execution to the user to allow composing
+   * multiple analytics and algorithms into a single program.
+   *
+   * @param analytic the analytic to run on the Graph
+   */
+  def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T])= {
+    jgraph.run(analytic)
+  }
+
+  /**
    * Runs a scatter-gather iteration on the graph.
    * No configuration options are provided.
    *

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
new file mode 100644
index 0000000..b13e82e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class for {@link GraphAnalytic}.
+ *
+ * @param <K> key type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ * @param <T> the return type
+ */
+public abstract class AbstractGraphAnalytic<K, VV, EV, T>
+implements GraphAnalytic<K, VV, EV, T> {
+
+	protected ExecutionEnvironment env;
+
+	@Override
+	public GraphAnalytic<K, VV, EV, T> run(Graph<K, VV, EV> input)
+			throws Exception {
+		env = input.getContext();
+		return null;
+	}
+
+	@Override
+	public T execute()
+			throws Exception {
+		Preconditions.checkNotNull(env);
+
+		env.execute();
+		return getResult();
+	}
+
+	@Override
+	public T execute(String jobName)
+			throws Exception {
+		Preconditions.checkNotNull(jobName);
+		Preconditions.checkNotNull(env);
+
+		env.execute(jobName);
+		return getResult();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index b17f7a5..dd25cfd 100755
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -18,20 +18,13 @@
 
 package org.apache.flink.graph;
 
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.List;
-import java.util.Arrays;
-
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
@@ -71,6 +64,13 @@ import org.apache.flink.graph.validation.GraphValidator;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
 /**
  * Represents a Graph consisting of {@link Edge edges} and {@link Vertex
  * vertices}.
@@ -1788,6 +1788,20 @@ public class Graph<K, VV, EV> {
 	}
 
 	/**
+	 * A {@code GraphAnalytic} is similar to a {@link GraphAlgorithm} but is terminal
+	 * and results are retrieved via accumulators.  A Flink program has a single
+	 * point of execution. A {@code GraphAnalytic} defers execution to the user to
+	 * allow composing multiple analytics and algorithms into a single program.
+	 *
+	 * @param analytic the analytic to run on the Graph
+	 * @param <T> the result type
+	 * @throws Exception
+	 */
+	public <T> void run(GraphAnalytic<K, VV, EV, T> analytic) throws Exception {
+		analytic.run(this);
+	}
+
+	/**
 	 * Groups by vertex and computes a GroupReduce transformation over the neighbors (both edges and vertices)
 	 * of each vertex. The neighborsFunction applied on the neighbors only has access to both the vertex id
 	 * and the vertex value of the grouping vertex.

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
new file mode 100644
index 0000000..dd221dc
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+
+/**
+ * A {@code GraphAnalytic} is similar to a {@link GraphAlgorithm} but is terminal
+ * and results are retrieved via accumulators.  A Flink program has a single
+ * point of execution. A {@code GraphAnalytic} defers execution to the user to
+ * allow composing multiple analytics and algorithms into a single program.
+ *
+ * @param <K> key type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ * @param <T> the return type
+ */
+public interface GraphAnalytic<K, VV, EV, T> {
+
+	/**
+	 * This method must be called after the program has executed:
+	 *  1) "run" analytics and algorithms
+	 *  2) call ExecutionEnvironment.execute()
+	 *  3) get analytics results
+	 *
+	 * @return the result
+	 */
+	T getResult();
+
+	/**
+	 * Execute the program and return the result.
+	 *
+	 * @return the result
+	 * @throws Exception
+	 */
+	T execute() throws Exception;
+
+	/**
+	 * Execute the program and return the result.
+	 *
+	 * @param jobName the name to assign to the job
+	 * @return the result
+	 * @throws Exception
+	 */
+	T execute(String jobName) throws Exception;
+
+	/**
+	 * All {@code GraphAnalytic} processing must be terminated by an
+	 * {@link OutputFormat}. Rather than obtained via accumulators rather than
+	 * returned by a {@link DataSet}.
+	 *
+	 * @param input input graph
+	 * @return this
+	 * @throws Exception
+	 */
+	GraphAnalytic<K, VV, EV, T> run(Graph<K, VV, EV> input) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
deleted file mode 100644
index 1eafce2..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.ReduceNeighborsFunction;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Triplet;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.VertexJoinFunction;
-import org.apache.flink.types.NullValue;
-
-import java.util.TreeMap;
-
-/**
- * Triangle Count Algorithm.
- *
- * This algorithm operates in three phases. First, vertices select neighbors with id greater than theirs
- * and send messages to them. Each received message is then propagated to neighbors with higher id.
- * Finally, if a node encounters the target id in the list of received messages, it increments the number
- * of triangles found.
- *
- * This implementation is non - iterative.
- *
- * The algorithm takes an undirected, unweighted graph as input and outputs a DataSet
- * which contains a single integer representing the number of triangles.
- */
-public class GSATriangleCount<K extends Comparable<K>, VV, EV> implements
-		GraphAlgorithm<K, VV, EV, DataSet<Integer>> {
-
-	@SuppressWarnings("serial")
-	@Override
-	public DataSet<Integer> run(Graph<K, VV, EV> input) throws Exception {
-
-		ExecutionEnvironment env = input.getContext();
-
-		// order the edges so that src is always higher than trg
-		DataSet<Edge<K, NullValue>> edges = input.getEdges().map(new OrderEdges<K, EV>()).distinct();
-
-		Graph<K, TreeMap<K, Integer>, NullValue> graph = Graph.fromDataSet(edges,
-				new VertexInitializer<K>(), env);
-
-		// select neighbors with ids higher than the current vertex id
-		// Gather: a no-op in this case
-		// Sum: create the set of neighbors
-		DataSet<Tuple2<K, TreeMap<K, Integer>>> higherIdNeighbors =
-				graph.reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
-
-		Graph<K, TreeMap<K, Integer>, NullValue> graphWithReinitializedVertexValues =
-				graph.mapVertices(new VertexInitializerEmptyTreeMap<K>());
-
-		// Apply: attach the computed values to the vertices
-		// joinWithVertices to update the node values
-		DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithHigherIdNeighbors =
-				graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new AttachValues<K>()).getVertices();
-
-		Graph<K, TreeMap<K,Integer>, NullValue> graphWithNeighbors = Graph.fromDataSet(verticesWithHigherIdNeighbors,
-				edges, env);
-
-		// propagate each received value to neighbors with higher id
-		// Gather: a no-op in this case
-		// Sum: propagate values
-		DataSet<Tuple2<K, TreeMap<K, Integer>>> propagatedValues = graphWithNeighbors
-				.reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
-
-		// Apply: attach propagated values to vertices
-		DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithPropagatedValues =
-				graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new AttachValues<K>()).getVertices();
-
-		Graph<K, TreeMap<K, Integer>, NullValue> graphWithPropagatedNeighbors =
-				Graph.fromDataSet(verticesWithPropagatedValues, graphWithNeighbors.getEdges(), env);
-
-		// Scatter: compute the number of triangles
-		DataSet<Integer> numberOfTriangles = graphWithPropagatedNeighbors.getTriplets()
-				.map(new ComputeTriangles<K>()).reduce(new ReduceFunction<Integer>() {
-
-					@Override
-					public Integer reduce(Integer first, Integer second) throws Exception {
-						return first + second;
-					}
-				});
-
-		return numberOfTriangles;
-	}
-
-	@SuppressWarnings("serial")
-	private static final class OrderEdges<K extends Comparable<K>, EV> implements
-		MapFunction<Edge<K, EV>, Edge<K, NullValue>> {
-
-		@Override
-		public Edge<K, NullValue> map(Edge<K, EV> edge) throws Exception {
-			if (edge.getSource().compareTo(edge.getTarget()) < 0) {
-				return new Edge<K, NullValue>(edge.getTarget(), edge.getSource(), NullValue.getInstance());
-			} else {
-				return new Edge<K, NullValue>(edge.getSource(), edge.getTarget(), NullValue.getInstance());
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class VertexInitializer<K> implements MapFunction<K, TreeMap<K, Integer>> {
-
-		@Override
-		public TreeMap<K, Integer> map(K value) throws Exception {
-			TreeMap<K, Integer> neighbors = new TreeMap<K, Integer>();
-			neighbors.put(value, 1);
-
-			return neighbors;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class VertexInitializerEmptyTreeMap<K> implements
-			MapFunction<Vertex<K, TreeMap<K, Integer>>, TreeMap<K, Integer>> {
-
-		@Override
-		public TreeMap<K, Integer> map(Vertex<K, TreeMap<K, Integer>> vertex) throws Exception {
-			return new TreeMap<K, Integer>();
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class AttachValues<K> implements VertexJoinFunction<TreeMap<K, Integer>,
-			TreeMap<K, Integer>> {
-
-		@Override
-		public TreeMap<K, Integer> vertexJoin(TreeMap<K, Integer> vertexValue,
-				TreeMap<K, Integer> inputValue) {
-			return inputValue;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class GatherHigherIdNeighbors<K> implements
-		ReduceNeighborsFunction<TreeMap<K,Integer>> {
-
-		@Override
-		public TreeMap<K, Integer> reduceNeighbors(TreeMap<K,Integer> first, TreeMap<K,Integer> second) {
-			for (K key : second.keySet()) {
-				Integer value = first.get(key);
-				if (value != null) {
-					first.put(key, value + second.get(key));
-				} else {
-					first.put(key, second.get(key));
-				}
-			}
-			return first;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ComputeTriangles<K> implements MapFunction<Triplet<K, TreeMap<K, Integer>, NullValue>,
-			Integer> {
-
-		@Override
-		public Integer map(Triplet<K, TreeMap<K, Integer>, NullValue> triplet) throws Exception {
-
-			Vertex<K, TreeMap<K, Integer>> srcVertex = triplet.getSrcVertex();
-			Vertex<K, TreeMap<K, Integer>> trgVertex = triplet.getTrgVertex();
-			int triangles = 0;
-
-			if(trgVertex.getValue().get(srcVertex.getId()) != null) {
-				triangles = trgVertex.getValue().get(srcVertex.getId());
-			}
-			return triangles;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
new file mode 100644
index 0000000..fc89e43
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient.Result;
+import org.apache.flink.graph.library.metric.undirected.VertexMetrics;
+import org.apache.flink.types.CopyableValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * The global clustering coefficient measures the connectedness of a graph.
+ * Scores range from 0.0 (no triangles) to 1.0 (complete graph).
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class GlobalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+	private TriangleCount<K, VV, EV> triangleCount;
+
+	private VertexMetrics<K, VV, EV> vertexMetrics;
+
+	// Optional configuration
+	private int littleParallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Override the parallelism of operators processing small amounts of data.
+	 *
+	 * @param littleParallelism operator parallelism
+	 * @return this
+	 */
+	public GlobalClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
+		this.littleParallelism = littleParallelism;
+
+		return this;
+	}
+
+	@Override
+	public GlobalClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input)
+			throws Exception {
+		super.run(input);
+
+		triangleCount = new TriangleCount<K, VV, EV>()
+			.setLittleParallelism(littleParallelism);
+
+		input.run(triangleCount);
+
+		vertexMetrics = new VertexMetrics<K, VV, EV>()
+			.setParallelism(littleParallelism);
+
+		input.run(vertexMetrics);
+
+		return this;
+	}
+
+	@Override
+	public Result getResult() {
+		return new Result(vertexMetrics.getResult().getNumberOfTriplets(), 3 * triangleCount.getResult());
+	}
+
+	/**
+	 * Wraps global clustering coefficient metrics.
+	 */
+	public static class Result {
+		private long tripletCount;
+		private long triangleCount;
+
+		public Result(long tripletCount, long triangleCount) {
+			this.tripletCount = tripletCount;
+			this.triangleCount = triangleCount;
+		}
+
+		/**
+		 * Get the number of triplets.
+		 *
+		 * @return number of triplets
+		 */
+		public long getNumberOfTriplets() {
+			return tripletCount;
+		}
+
+		/**
+		 * Get the number of triangles.
+		 *
+		 * @return number of triangles
+		 */
+		public long getNumberOfTriangles() {
+			return triangleCount;
+		}
+
+		/**
+		 * Get the global clustering coefficient score. This is computed as the
+		 * number of closed triplets (triangles) divided by the total number of
+		 * triplets.
+		 *
+		 * A score of {@code Double.NaN} is returned for a graph of isolated vertices
+		 * for which both the triangle count and number of neighbors are zero.
+		 *
+		 * @return global clustering coefficient score
+		 */
+		public double getLocalClusteringCoefficientScore() {
+			return (tripletCount == 0) ? Double.NaN : triangleCount / (double)tripletCount;
+		}
+
+		@Override
+		public String toString() {
+			return "triplet count: " + tripletCount + ", triangle count:" + triangleCount;
+		}
+
+		@Override
+		public int hashCode() {
+			return new HashCodeBuilder()
+				.append(tripletCount)
+				.append(triangleCount)
+				.hashCode();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == null) { return false; }
+			if (obj == this) { return true; }
+			if (obj.getClass() != getClass()) { return false; }
+
+			Result rhs = (Result)obj;
+
+			return new EqualsBuilder()
+				.append(tripletCount, rhs.tripletCount)
+				.append(triangleCount, rhs.triangleCount)
+				.isEquals();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index d1618d1..bc62d36 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -185,7 +185,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
 	}
 
 	/**
-	 * Wraps the vertex type to encapsulate results from the Clustering Coefficient algorithm.
+	 * Wraps the vertex type to encapsulate results from the local clustering coefficient algorithm.
 	 *
 	 * @param <T> ID type
 	 */

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java
new file mode 100644
index 0000000..bc43725
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.CountHelper;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.AbstractID;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Count the number of distinct triangles in an undirected graph.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ * @see TriangleListing
+ */
+public class TriangleCount<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Long> {
+
+	private String id = new AbstractID().toString();
+
+	// Optional configuration
+	private int littleParallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Override the parallelism of operators processing small amounts of data.
+	 *
+	 * @param littleParallelism operator parallelism
+	 * @return this
+	 */
+	public TriangleCount<K, VV, EV> setLittleParallelism(int littleParallelism) {
+		this.littleParallelism = littleParallelism;
+
+		return this;
+	}
+
+	@Override
+	public TriangleCount<K, VV, EV> run(Graph<K, VV, EV> input)
+			throws Exception {
+		super.run(input);
+
+		DataSet<Tuple3<K, K, K>> triangles = input
+			.run(new TriangleListing<K, VV, EV>()
+				.setSortTriangleVertices(false)
+				.setLittleParallelism(littleParallelism));
+
+		triangles.output(new CountHelper<Tuple3<K, K, K>>(id)).name("Count triangles");
+
+		return this;
+	}
+
+	@Override
+	public Long getResult() {
+		return env.getLastJobExecutionResult().<Long> getAccumulatorResult(id);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index 1319d02..6245433 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -286,11 +286,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
 		@Override
 		public Tuple3<T, T, T> map(Tuple3<T, T, T> value)
 				throws Exception {
-			T temp_val;
-
 			// by the triangle listing algorithm we know f1 < f2
 			if (value.f0.compareTo(value.f1) > 0) {
-				temp_val = value.f0;
+				T temp_val = value.f0;
 				value.f0 = value.f1;
 
 				if (temp_val.compareTo(value.f2) <= 0) {

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
new file mode 100644
index 0000000..41ae27a
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.metric.undirected;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
+import org.apache.flink.graph.library.metric.undirected.VertexMetrics.Result;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.AbstractID;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Compute the number of vertices, number of edges, and number of triplets in
+ * an undirected graph.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class VertexMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+	private String id = new AbstractID().toString();
+
+	// Optional configuration
+	private boolean includeZeroDegreeVertices = false;
+
+	private boolean reduceOnTargetId = false;
+
+	private int parallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * By default only the edge set is processed for the computation of degree.
+	 * When this flag is set an additional join is performed against the vertex
+	 * set in order to output vertices with a degree of zero.
+	 *
+	 * @param includeZeroDegreeVertices whether to output vertices with a
+	 *                                  degree of zero
+	 * @return this
+	 */
+	public VertexMetrics<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
+		this.includeZeroDegreeVertices = includeZeroDegreeVertices;
+
+		return this;
+	}
+
+	/**
+	 * The degree can be counted from either the edge source or target IDs.
+	 * By default the source IDs are counted. Reducing on target IDs may
+	 * optimize the algorithm if the input edge list is sorted by target ID.
+	 *
+	 * @param reduceOnTargetId set to {@code true} if the input edge list
+	 *                         is sorted by target ID
+	 * @return this
+	 */
+	public VertexMetrics<K, VV, EV> setReduceOnTargetId(boolean reduceOnTargetId) {
+		this.reduceOnTargetId = reduceOnTargetId;
+
+		return this;
+	}
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public VertexMetrics<K, VV, EV> setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	@Override
+	public VertexMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
+			throws Exception {
+		super.run(input);
+
+		DataSet<Vertex<K, LongValue>> vertexDegree = input
+			.run(new VertexDegree<K, VV, EV>()
+				.setIncludeZeroDegreeVertices(includeZeroDegreeVertices)
+				.setReduceOnTargetId(reduceOnTargetId)
+				.setParallelism(parallelism));
+
+		vertexDegree.output(new VertexMetricsHelper<K>(id)).name("Vertex metrics");
+
+		return this;
+	}
+
+	@Override
+	public Result getResult() {
+		JobExecutionResult res = env.getLastJobExecutionResult();
+
+		long vertexCount = res.getAccumulatorResult(id + "-0");
+		long edgeCount = res.getAccumulatorResult(id + "-1");
+		long tripletCount = res.getAccumulatorResult(id + "-2");
+
+		return new Result(vertexCount, edgeCount / 2, tripletCount);
+	}
+
+	/**
+	 * Helper class to collect vertex metrics.
+	 *
+	 * @param <T> ID type
+	 */
+	private static class VertexMetricsHelper<T>
+	extends RichOutputFormat<Vertex<T, LongValue>> {
+		private final String id;
+
+		private long vertexCount;
+		private long edgeCount;
+		private long tripletCount;
+
+		/**
+		 * This helper class collects vertex metrics by scanning over and
+		 * discarding elements from the given DataSet.
+		 *
+		 * The unique id is required because Flink's accumulator namespace is
+		 * among all operators.
+		 *
+		 * @param id unique string used for accumulator names
+		 */
+		public VertexMetricsHelper(String id) {
+			this.id = id;
+		}
+
+		@Override
+		public void configure(Configuration parameters) {}
+
+		@Override
+		public void open(int taskNumber, int numTasks) throws IOException {}
+
+		@Override
+		public void writeRecord(Vertex<T, LongValue> record) throws IOException {
+			long degree = record.f1.getValue();
+
+			vertexCount++;
+			edgeCount += degree;
+			tripletCount += degree * (degree - 1) / 2;
+		}
+
+		@Override
+		public void close() throws IOException {
+			getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount));
+			getRuntimeContext().addAccumulator(id + "-1", new LongCounter(edgeCount));
+			getRuntimeContext().addAccumulator(id + "-2", new LongCounter(tripletCount));
+		}
+	}
+
+	/**
+	 * Wraps vertex metrics.
+	 */
+	public static class Result {
+		private long vertexCount;
+		private long edgeCount;
+		private long tripletCount;
+
+		public Result(long vertexCount, long edgeCount, long tripletCount) {
+			this.vertexCount = vertexCount;
+			this.edgeCount = edgeCount;
+			this.tripletCount = tripletCount;
+		}
+
+		/**
+		 * Get the number of vertices.
+		 *
+		 * @return number of vertices
+		 */
+		public long getNumberOfVertices() {
+			return vertexCount;
+		}
+
+		/**
+		 * Get the number of edges.
+		 *
+		 * @return number of edges
+		 */
+		public long getNumberOfEdges() {
+			return edgeCount;
+		}
+
+		/**
+		 * Get the number of triplets.
+		 *
+		 * @return number of triplets
+		 */
+		public long getNumberOfTriplets() {
+			return tripletCount;
+		}
+
+		@Override
+		public String toString() {
+			return "vertex count: " + vertexCount
+					+ ", edge count:" + edgeCount
+					+ ", triplet count: " + tripletCount;
+		}
+
+		@Override
+		public int hashCode() {
+			return new HashCodeBuilder()
+				.append(vertexCount)
+				.append(edgeCount)
+				.append(tripletCount)
+				.hashCode();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == null) { return false; }
+			if (obj == this) { return true; }
+			if (obj.getClass() != getClass()) { return false; }
+
+			Result rhs = (Result)obj;
+
+			return new EqualsBuilder()
+				.append(vertexCount, rhs.vertexCount)
+				.append(edgeCount, rhs.edgeCount)
+				.append(tripletCount, rhs.tripletCount)
+				.isEquals();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java
new file mode 100644
index 0000000..71ec2a6
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class GlobalClusteringCoefficientTest
+extends AsmTestBase {
+
+	@Test
+	public void testWithSimpleGraph()
+			throws Exception {
+		Result expectedResult = new Result(13, 6);
+
+		Result globalClusteringCoefficient = new GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
+			.run(undirectedSimpleGraph)
+			.execute();
+
+		assertEquals(expectedResult, globalClusteringCoefficient);
+	}
+
+	@Test
+	public void testWithCompleteGraph()
+			throws Exception {
+		long expectedDegree = completeGraphVertexCount - 1;
+		long expectedCount = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+
+		Result expectedResult = new Result(expectedCount, expectedCount);
+
+		Result globalClusteringCoefficient = new GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+			.run(completeGraph)
+			.execute();
+
+		assertEquals(expectedResult, globalClusteringCoefficient);
+	}
+
+	@Test
+	public void testWithEmptyGraph()
+			throws Exception {
+		Result expectedResult = new Result(0, 0);
+
+		Result globalClusteringCoefficient = new GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+			.run(emptyGraph)
+			.execute();
+
+		assertEquals(expectedResult, globalClusteringCoefficient);
+	}
+
+	@Test
+	public void testWithRMatGraph()
+			throws Exception {
+		Result expectedResult = new Result(1003442, 225147);
+
+		Result globalClusteringCoefficient = new GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+			.run(undirectedRMatGraph)
+			.execute();
+
+		assertEquals(expectedResult, globalClusteringCoefficient);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
index 414f200..3455df4 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 import org.junit.Test;
 
+import java.util.List;
+
 import static org.junit.Assert.assertEquals;
 
 public class LocalClusteringCoefficientTest
@@ -61,7 +63,11 @@ extends AsmTestBase {
 		DataSet<Result<LongValue>> cc = completeGraph
 			.run(new LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
 
-		for (Result<LongValue> result : cc.collect()) {
+		List<Result<LongValue>> results = cc.collect();
+
+		assertEquals(completeGraphVertexCount, results.size());
+
+		for (Result<LongValue> result : results) {
 			assertEquals(expectedDegree, result.getDegree().getValue());
 			assertEquals(expectedTriangleCount, result.getTriangleCount().getValue());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java
new file mode 100644
index 0000000..6bf9b0d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TriangleCountTest
+extends AsmTestBase {
+
+	@Test
+	public void testWithSimpleGraph()
+			throws Exception {
+		long triangleCount = new TriangleCount<IntValue, NullValue, NullValue>()
+			.run(undirectedSimpleGraph)
+			.execute();
+
+		assertEquals(2, triangleCount);
+	}
+
+	@Test
+	public void testWithCompleteGraph()
+			throws Exception {
+		long expectedDegree = completeGraphVertexCount - 1;
+		long expectedCount = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2) / 3;
+
+		long triangleCount = new TriangleCount<LongValue, NullValue, NullValue>()
+			.run(completeGraph)
+			.execute();
+
+		assertEquals(expectedCount, triangleCount);
+	}
+
+	@Test
+	public void testWithEmptyGraph()
+			throws Exception {
+		long triangleCount = new TriangleCount<LongValue, NullValue, NullValue>()
+			.run(emptyGraph)
+			.execute();
+
+		assertEquals(0, triangleCount);
+	}
+
+	@Test
+	public void testWithRMatGraph()
+			throws Exception {
+		long triangleCount = new TriangleCount<LongValue, NullValue, NullValue>()
+			.run(undirectedRMatGraph)
+			.execute();
+
+		assertEquals(75049, triangleCount);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
new file mode 100644
index 0000000..a36ca94
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.metric.undirected;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.metric.undirected.VertexMetrics.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class VertexMetricsTest
+extends AsmTestBase {
+
+	@Test
+	public void testWithSimpleGraph()
+			throws Exception {
+		Result expectedResult = new Result(6, 7, 13);
+
+		Result vertexMetrics = new VertexMetrics<IntValue, NullValue, NullValue>()
+			.run(undirectedSimpleGraph)
+			.execute();
+
+		assertEquals(expectedResult, vertexMetrics);
+	}
+
+	@Test
+	public void testWithCompleteGraph()
+			throws Exception {
+		long expectedDegree = completeGraphVertexCount - 1;
+		long expectedEdges = completeGraphVertexCount * expectedDegree / 2;
+		long expectedTriplets = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+
+		Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets);
+
+		Result vertexMetrics = new VertexMetrics<LongValue, NullValue, NullValue>()
+			.run(completeGraph)
+			.execute();
+
+		assertEquals(expectedResult, vertexMetrics);
+	}
+
+	@Test
+	public void testWithEmptyGraph()
+			throws Exception {
+		Result expectedResult;
+
+		expectedResult = new Result(0, 0, 0);
+
+		Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>()
+			.setIncludeZeroDegreeVertices(false)
+			.run(emptyGraph)
+			.execute();
+
+		assertEquals(withoutZeroDegreeVertices, expectedResult);
+
+		expectedResult = new Result(3, 0, 0);
+
+		Result withZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>()
+			.setIncludeZeroDegreeVertices(true)
+			.run(emptyGraph)
+			.execute();
+
+		assertEquals(expectedResult, withZeroDegreeVertices);
+	}
+
+	@Test
+	public void testWithRMatGraph()
+			throws Exception {
+		Result expectedResult = new Result(902, 10442, 1003442);
+
+		Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>()
+			.run(undirectedRMatGraph)
+			.execute();
+
+		assertEquals(expectedResult, withoutZeroDegreeVertices);
+	}
+}