You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/10/03 14:55:07 UTC

[2/2] flink git commit: [FLINK-4643] [gelly] Average Clustering Coefficient

[FLINK-4643] [gelly] Average Clustering Coefficient

Directed and undirected analytics computing the average clustering
coefficient over vertices in a graph and an updated driver.

This closes #2528


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95c08eab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95c08eab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95c08eab

Branch: refs/heads/master
Commit: 95c08eab36bfa09c501a84f5b5f116d666d03ae1
Parents: a79efdc
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Sep 20 12:00:04 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Oct 3 10:37:02 2016 -0400

----------------------------------------------------------------------
 .../graph/examples/ClusteringCoefficient.java   |  32 ++-
 .../directed/AverageClusteringCoefficient.java  | 212 +++++++++++++++++++
 .../AverageClusteringCoefficient.java           | 212 +++++++++++++++++++
 .../org/apache/flink/graph/asm/AsmTestBase.java |   4 +-
 .../AverageClusteringCoefficientTest.java       |  82 +++++++
 .../AverageClusteringCoefficientTest.java       |  82 +++++++
 6 files changed, 622 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
index 7835531..615d765 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
@@ -102,6 +102,7 @@ public class ClusteringCoefficient {
 
 		// global and local clustering coefficient results
 		GraphAnalytic gcc;
+		GraphAnalytic acc;
 		DataSet lcc;
 
 		switch (parameters.get("input", "")) {
@@ -127,6 +128,9 @@ public class ClusteringCoefficient {
 							gcc = graph
 								.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
+							acc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
 							lcc = graph
 								.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
@@ -134,6 +138,9 @@ public class ClusteringCoefficient {
 							gcc = graph
 								.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
+							acc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
 							lcc = graph
 								.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
@@ -148,6 +155,9 @@ public class ClusteringCoefficient {
 							gcc = graph
 								.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
+							acc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
 							lcc = graph
 								.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
@@ -155,6 +165,9 @@ public class ClusteringCoefficient {
 							gcc = graph
 								.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
+							acc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
 							lcc = graph
 								.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
@@ -189,6 +202,9 @@ public class ClusteringCoefficient {
 						gcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
 								.setLittleParallelism(little_parallelism));
+						acc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
 						lcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
 								.setIncludeZeroDegreeVertices(false)
@@ -203,6 +219,9 @@ public class ClusteringCoefficient {
 						gcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
 								.setLittleParallelism(little_parallelism));
+						acc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
 						lcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
 								.setIncludeZeroDegreeVertices(false)
@@ -219,6 +238,9 @@ public class ClusteringCoefficient {
 						gcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
 								.setLittleParallelism(little_parallelism));
+						acc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
 						lcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
 								.setIncludeZeroDegreeVertices(false)
@@ -233,6 +255,9 @@ public class ClusteringCoefficient {
 						gcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
 								.setLittleParallelism(little_parallelism));
+						acc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
 						lcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
 								.setIncludeZeroDegreeVertices(false)
@@ -262,11 +287,13 @@ public class ClusteringCoefficient {
 					}
 				}
 				System.out.println(gcc.getResult());
+				System.out.println(acc.getResult());
 				break;
 
 			case "hash":
 				System.out.println(DataSetUtils.checksumHashCode(lcc));
 				System.out.println(gcc.getResult());
+				System.out.println(acc.getResult());
 				break;
 
 			case "csv":
@@ -280,7 +307,10 @@ public class ClusteringCoefficient {
 
 				lcc.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
 
-				System.out.println(gcc.execute());
+				env.execute("Clustering Coefficient");
+
+				System.out.println(gcc.getResult());
+				System.out.println(acc.getResult());
 				break;
 
 			default:

http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
new file mode 100644
index 0000000..f589d04
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.directed;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient.Result;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.AbstractID;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * The average clustering coefficient measures the mean connectedness of a
+ * graph. Scores range from 0.0 (no triangles) to 1.0 (complete graph).
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class AverageClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+	private String id = new AbstractID().toString();
+
+	// Optional configuration
+	private int littleParallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Override the parallelism of operators processing small amounts of data.
+	 *
+	 * @param littleParallelism operator parallelism
+	 * @return this
+	 */
+	public AverageClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
+		this.littleParallelism = littleParallelism;
+
+		return this;
+	}
+
+	/*
+	 * Implementation notes:
+	 *
+	 * The requirement that "K extends CopyableValue<K>" can be removed when
+	 *   removed from LocalClusteringCoefficient.
+	 */
+
+	@Override
+	public AverageClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input)
+			throws Exception {
+		super.run(input);
+
+		DataSet<LocalClusteringCoefficient.Result<K>> localClusteringCoefficient = input
+			.run(new LocalClusteringCoefficient<K, VV, EV>()
+				.setLittleParallelism(littleParallelism));
+
+		localClusteringCoefficient
+			.output(new AverageClusteringCoefficientHelper<K>(id))
+				.name("Average clustering coefficient");
+
+		return this;
+	}
+
+	@Override
+	public Result getResult() {
+		JobExecutionResult res = env.getLastJobExecutionResult();
+
+		long vertexCount = res.getAccumulatorResult(id + "-0");
+		double sumOfLocalClusteringCoefficient = res.getAccumulatorResult(id + "-1");
+
+		return new Result(vertexCount, sumOfLocalClusteringCoefficient);
+	}
+
+	/**
+	 * Helper class to collect the average clustering coefficient.
+	 *
+	 * @param <T> ID type
+	 */
+	private static class AverageClusteringCoefficientHelper<T>
+	extends RichOutputFormat<LocalClusteringCoefficient.Result<T>> {
+		private final String id;
+
+		private long vertexCount;
+		private double sumOfLocalClusteringCoefficient;
+
+		/**
+		 * The unique id is required because Flink's accumulator namespace is
+		 * shared among all operators.
+		 *
+		 * @param id unique string used for accumulator names
+		 */
+		public AverageClusteringCoefficientHelper(String id) {
+			this.id = id;
+		}
+
+		@Override
+		public void configure(Configuration parameters) {}
+
+		@Override
+		public void open(int taskNumber, int numTasks) throws IOException {}
+
+		@Override
+		public void writeRecord(LocalClusteringCoefficient.Result<T> record) throws IOException {
+			vertexCount++;
+
+			// local clustering coefficient is only defined on vertices with
+			// at least two neighbors yielding at least one pair of neighbors
+			if (record.getDegree().getValue() > 1) {
+				sumOfLocalClusteringCoefficient += record.getLocalClusteringCoefficientScore();
+			}
+		}
+
+		@Override
+		public void close() throws IOException {
+			getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount));
+			getRuntimeContext().addAccumulator(id + "-1", new DoubleCounter(sumOfLocalClusteringCoefficient));
+		}
+	}
+
+	/**
+	 * Wraps global clustering coefficient metrics.
+	 */
+	public static class Result {
+		private long vertexCount;
+
+		private double averageLocalClusteringCoefficient;
+
+		/**
+		 * Instantiate an immutable result.
+		 *
+		 * @param vertexCount vertex count
+		 * @param sumOfLocalClusteringCoefficient sum over the vertices' local
+		 *                                        clustering coefficients
+		 */
+		public Result(long vertexCount, double sumOfLocalClusteringCoefficient) {
+			this.vertexCount = vertexCount;
+			this.averageLocalClusteringCoefficient = sumOfLocalClusteringCoefficient / vertexCount;
+		}
+
+		/**
+		 * Get the number of vertices.
+		 *
+		 * @return number of vertices
+		 */
+		public long getNumberOfVertices() {
+			return vertexCount;
+		}
+
+		/**
+		 * Get the average clustering coefficient.
+		 *
+		 * @return number of triangles
+		 */
+		public double getAverageClusteringCoefficient() {
+			return averageLocalClusteringCoefficient;
+		}
+
+		@Override
+		public String toString() {
+			return "vertex count: " + vertexCount
+				+ ", average clustering coefficient: " + averageLocalClusteringCoefficient;
+		}
+
+		@Override
+		public int hashCode() {
+			return new HashCodeBuilder()
+				.append(vertexCount)
+				.append(averageLocalClusteringCoefficient)
+				.hashCode();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == null) { return false; }
+			if (obj == this) { return true; }
+			if (obj.getClass() != getClass()) { return false; }
+
+			Result rhs = (Result)obj;
+
+			return new EqualsBuilder()
+				.append(vertexCount, rhs.vertexCount)
+				.append(averageLocalClusteringCoefficient, rhs.averageLocalClusteringCoefficient)
+				.isEquals();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
new file mode 100644
index 0000000..03dbc71
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient.Result;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.AbstractID;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * The average clustering coefficient measures the mean connectedness of a
+ * graph. Scores range from 0.0 (no triangles) to 1.0 (complete graph).
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class AverageClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+	private String id = new AbstractID().toString();
+
+	// Optional configuration
+	private int littleParallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Override the parallelism of operators processing small amounts of data.
+	 *
+	 * @param littleParallelism operator parallelism
+	 * @return this
+	 */
+	public AverageClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
+		this.littleParallelism = littleParallelism;
+
+		return this;
+	}
+
+	/*
+	 * Implementation notes:
+	 *
+	 * The requirement that "K extends CopyableValue<K>" can be removed when
+	 *   removed from LocalClusteringCoefficient.
+	 */
+
+	@Override
+	public AverageClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input)
+			throws Exception {
+		super.run(input);
+
+		DataSet<LocalClusteringCoefficient.Result<K>> localClusteringCoefficient = input
+			.run(new LocalClusteringCoefficient<K, VV, EV>()
+				.setLittleParallelism(littleParallelism));
+
+		localClusteringCoefficient
+			.output(new AverageClusteringCoefficientHelper<K>(id))
+				.name("Average clustering coefficient");
+
+		return this;
+	}
+
+	@Override
+	public Result getResult() {
+		JobExecutionResult res = env.getLastJobExecutionResult();
+
+		long vertexCount = res.getAccumulatorResult(id + "-0");
+		double sumOfLocalClusteringCoefficient = res.getAccumulatorResult(id + "-1");
+
+		return new Result(vertexCount, sumOfLocalClusteringCoefficient);
+	}
+
+	/**
+	 * Helper class to collect the average clustering coefficient.
+	 *
+	 * @param <T> ID type
+	 */
+	private static class AverageClusteringCoefficientHelper<T>
+	extends RichOutputFormat<LocalClusteringCoefficient.Result<T>> {
+		private final String id;
+
+		private long vertexCount;
+		private double sumOfLocalClusteringCoefficient;
+
+		/**
+		 * The unique id is required because Flink's accumulator namespace is
+		 * shared among all operators.
+		 *
+		 * @param id unique string used for accumulator names
+		 */
+		public AverageClusteringCoefficientHelper(String id) {
+			this.id = id;
+		}
+
+		@Override
+		public void configure(Configuration parameters) {}
+
+		@Override
+		public void open(int taskNumber, int numTasks) throws IOException {}
+
+		@Override
+		public void writeRecord(LocalClusteringCoefficient.Result<T> record) throws IOException {
+			vertexCount++;
+
+			// local clustering coefficient is only defined on vertices with
+			// at least two neighbors yielding at least one pair of neighbors
+			if (record.getDegree().getValue() > 1) {
+				sumOfLocalClusteringCoefficient += record.getLocalClusteringCoefficientScore();
+			}
+		}
+
+		@Override
+		public void close() throws IOException {
+			getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount));
+			getRuntimeContext().addAccumulator(id + "-1", new DoubleCounter(sumOfLocalClusteringCoefficient));
+		}
+	}
+
+	/**
+	 * Wraps global clustering coefficient metrics.
+	 */
+	public static class Result {
+		private long vertexCount;
+
+		private double averageLocalClusteringCoefficient;
+
+		/**
+		 * Instantiate an immutable result.
+		 *
+		 * @param vertexCount vertex count
+		 * @param sumOfLocalClusteringCoefficient sum over the vertices' local
+		 *                                        clustering coefficients
+		 */
+		public Result(long vertexCount, double sumOfLocalClusteringCoefficient) {
+			this.vertexCount = vertexCount;
+			this.averageLocalClusteringCoefficient = sumOfLocalClusteringCoefficient / vertexCount;
+		}
+
+		/**
+		 * Get the number of vertices.
+		 *
+		 * @return number of vertices
+		 */
+		public long getNumberOfVertices() {
+			return vertexCount;
+		}
+
+		/**
+		 * Get the average clustering coefficient.
+		 *
+		 * @return number of triangles
+		 */
+		public double getAverageClusteringCoefficient() {
+			return averageLocalClusteringCoefficient;
+		}
+
+		@Override
+		public String toString() {
+			return "vertex count: " + vertexCount
+				+ ", average clustering coefficient: " + averageLocalClusteringCoefficient;
+		}
+
+		@Override
+		public int hashCode() {
+			return new HashCodeBuilder()
+				.append(vertexCount)
+				.append(averageLocalClusteringCoefficient)
+				.hashCode();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == null) { return false; }
+			if (obj == this) { return true; }
+			if (obj.getClass() != getClass()) { return false; }
+
+			Result rhs = (Result)obj;
+
+			return new EqualsBuilder()
+				.append(vertexCount, rhs.vertexCount)
+				.append(averageLocalClusteringCoefficient, rhs.averageLocalClusteringCoefficient)
+				.isEquals();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index 28e2669..8ef87a5 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -48,6 +48,8 @@ public class AsmTestBase {
 	protected Graph<LongValue,NullValue,NullValue> completeGraph;
 
 	// empty graph
+	protected final long emptyGraphVertexCount = 3;
+
 	protected Graph<LongValue,NullValue,NullValue> emptyGraph;
 
 	// RMat graph
@@ -86,7 +88,7 @@ public class AsmTestBase {
 			.generate();
 
 		// empty graph
-		emptyGraph = new EmptyGraph(env, 3)
+		emptyGraph = new EmptyGraph(env, emptyGraphVertexCount)
 			.generate();
 
 		// RMat graph

http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficientTest.java
new file mode 100644
index 0000000..9de9bac
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficientTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.directed;
+
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class AverageClusteringCoefficientTest
+extends AsmTestBase {
+
+	@Test
+	public void testWithSimpleGraph()
+			throws Exception {
+		// see results in LocalClusteringCoefficientTest.testSimpleGraph
+		Result expectedResult = new Result(6, 1.0/2 + 2.0/6 + 2.0/6 + 1.0/12);
+
+		Result averageClusteringCoefficient = new AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+			.run(directedSimpleGraph)
+			.execute();
+
+		assertEquals(expectedResult, averageClusteringCoefficient);
+	}
+
+	@Test
+	public void testWithCompleteGraph()
+			throws Exception {
+		Result expectedResult = new Result(completeGraphVertexCount, completeGraphVertexCount);
+
+		Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+			.run(completeGraph)
+			.execute();
+
+		assertEquals(expectedResult, averageClusteringCoefficient);
+	}
+
+	@Test
+	public void testWithEmptyGraph()
+			throws Exception {
+		Result expectedResult = new Result(emptyGraphVertexCount, 0);
+
+		Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+			.run(emptyGraph)
+			.execute();
+
+		assertEquals(expectedResult, averageClusteringCoefficient);
+	}
+
+	@Test
+	public void testWithRMatGraph()
+			throws Exception {
+		Result expectedResult = new Result(902, 297.152607);
+
+		Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+			.run(directedRMatGraph)
+			.execute();
+
+		assertEquals(expectedResult.getNumberOfVertices(), averageClusteringCoefficient.getNumberOfVertices());
+		assertEquals(expectedResult.getAverageClusteringCoefficient(), averageClusteringCoefficient.getAverageClusteringCoefficient(), 0.000001);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficientTest.java
new file mode 100644
index 0000000..34fda17
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficientTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class AverageClusteringCoefficientTest
+extends AsmTestBase {
+
+	@Test
+	public void testWithSimpleGraph()
+			throws Exception {
+		// see results in LocalClusteringCoefficientTest.testSimpleGraph
+		Result expectedResult = new Result(6, 1.0/1 + 2.0/3 + 2.0/3 + 1.0/6);
+
+		Result averageClusteringCoefficient = new AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+			.run(undirectedSimpleGraph)
+			.execute();
+
+		assertEquals(expectedResult, averageClusteringCoefficient);
+	}
+
+	@Test
+	public void testWithCompleteGraph()
+			throws Exception {
+		Result expectedResult = new Result(completeGraphVertexCount, completeGraphVertexCount);
+
+		Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+			.run(completeGraph)
+			.execute();
+
+		assertEquals(expectedResult, averageClusteringCoefficient);
+	}
+
+	@Test
+	public void testWithEmptyGraph()
+			throws Exception {
+		Result expectedResult = new Result(emptyGraphVertexCount, 0);
+
+		Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+			.run(emptyGraph)
+			.execute();
+
+		assertEquals(expectedResult, averageClusteringCoefficient);
+	}
+
+	@Test
+	public void testWithRMatGraph()
+			throws Exception {
+		Result expectedResult = new Result(902, 380.40109);
+
+		Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+			.run(undirectedRMatGraph)
+			.execute();
+
+		assertEquals(expectedResult.getNumberOfVertices(), averageClusteringCoefficient.getNumberOfVertices());
+		assertEquals(expectedResult.getAverageClusteringCoefficient(), averageClusteringCoefficient.getAverageClusteringCoefficient(), 0.000001);
+	}
+}