You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/10/03 14:55:07 UTC
[2/2] flink git commit: [FLINK-4643] [gelly] Average Clustering
Coefficient
[FLINK-4643] [gelly] Average Clustering Coefficient
Directed and undirected analytics computing the average clustering
coefficient over vertices in a graph and an updated driver.
This closes #2528
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95c08eab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95c08eab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95c08eab
Branch: refs/heads/master
Commit: 95c08eab36bfa09c501a84f5b5f116d666d03ae1
Parents: a79efdc
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Sep 20 12:00:04 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Oct 3 10:37:02 2016 -0400
----------------------------------------------------------------------
.../graph/examples/ClusteringCoefficient.java | 32 ++-
.../directed/AverageClusteringCoefficient.java | 212 +++++++++++++++++++
.../AverageClusteringCoefficient.java | 212 +++++++++++++++++++
.../org/apache/flink/graph/asm/AsmTestBase.java | 4 +-
.../AverageClusteringCoefficientTest.java | 82 +++++++
.../AverageClusteringCoefficientTest.java | 82 +++++++
6 files changed, 622 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
index 7835531..615d765 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
@@ -102,6 +102,7 @@ public class ClusteringCoefficient {
// global and local clustering coefficient results
GraphAnalytic gcc;
+ GraphAnalytic acc;
DataSet lcc;
switch (parameters.get("input", "")) {
@@ -127,6 +128,9 @@ public class ClusteringCoefficient {
gcc = graph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
+ acc = graph
+ .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = graph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
@@ -134,6 +138,9 @@ public class ClusteringCoefficient {
gcc = graph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
+ acc = graph
+ .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = graph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
@@ -148,6 +155,9 @@ public class ClusteringCoefficient {
gcc = graph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
+ acc = graph
+ .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = graph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
@@ -155,6 +165,9 @@ public class ClusteringCoefficient {
gcc = graph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
+ acc = graph
+ .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = graph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
@@ -189,6 +202,9 @@ public class ClusteringCoefficient {
gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
+ acc = newGraph
+ .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false)
@@ -203,6 +219,9 @@ public class ClusteringCoefficient {
gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
+ acc = newGraph
+ .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false)
@@ -219,6 +238,9 @@ public class ClusteringCoefficient {
gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
+ acc = newGraph
+ .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false)
@@ -233,6 +255,9 @@ public class ClusteringCoefficient {
gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
+ acc = newGraph
+ .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false)
@@ -262,11 +287,13 @@ public class ClusteringCoefficient {
}
}
System.out.println(gcc.getResult());
+ System.out.println(acc.getResult());
break;
case "hash":
System.out.println(DataSetUtils.checksumHashCode(lcc));
System.out.println(gcc.getResult());
+ System.out.println(acc.getResult());
break;
case "csv":
@@ -280,7 +307,10 @@ public class ClusteringCoefficient {
lcc.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
- System.out.println(gcc.execute());
+ env.execute("Clustering Coefficient");
+
+ System.out.println(gcc.getResult());
+ System.out.println(acc.getResult());
break;
default:
http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
new file mode 100644
index 0000000..f589d04
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.directed;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient.Result;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.AbstractID;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * The average clustering coefficient measures the mean connectedness of a
+ * graph. Scores range from 0.0 (no triangles) to 1.0 (complete graph).
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class AverageClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+ private String id = new AbstractID().toString();
+
+ // Optional configuration
+ private int littleParallelism = PARALLELISM_DEFAULT;
+
+ /**
+ * Override the parallelism of operators processing small amounts of data.
+ *
+ * @param littleParallelism operator parallelism
+ * @return this
+ */
+ public AverageClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
+ this.littleParallelism = littleParallelism;
+
+ return this;
+ }
+
+ /*
+ * Implementation notes:
+ *
+ * The requirement that "K extends CopyableValue<K>" can be removed when
+ * removed from LocalClusteringCoefficient.
+ */
+
+ @Override
+ public AverageClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input)
+ throws Exception {
+ super.run(input);
+
+ DataSet<LocalClusteringCoefficient.Result<K>> localClusteringCoefficient = input
+ .run(new LocalClusteringCoefficient<K, VV, EV>()
+ .setLittleParallelism(littleParallelism));
+
+ localClusteringCoefficient
+ .output(new AverageClusteringCoefficientHelper<K>(id))
+ .name("Average clustering coefficient");
+
+ return this;
+ }
+
+ @Override
+ public Result getResult() {
+ JobExecutionResult res = env.getLastJobExecutionResult();
+
+ long vertexCount = res.getAccumulatorResult(id + "-0");
+ double sumOfLocalClusteringCoefficient = res.getAccumulatorResult(id + "-1");
+
+ return new Result(vertexCount, sumOfLocalClusteringCoefficient);
+ }
+
+ /**
+ * Helper class to collect the average clustering coefficient.
+ *
+ * @param <T> ID type
+ */
+ private static class AverageClusteringCoefficientHelper<T>
+ extends RichOutputFormat<LocalClusteringCoefficient.Result<T>> {
+ private final String id;
+
+ private long vertexCount;
+ private double sumOfLocalClusteringCoefficient;
+
+ /**
+ * The unique id is required because Flink's accumulator namespace is
+ * shared among all operators.
+ *
+ * @param id unique string used for accumulator names
+ */
+ public AverageClusteringCoefficientHelper(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {}
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {}
+
+ @Override
+ public void writeRecord(LocalClusteringCoefficient.Result<T> record) throws IOException {
+ vertexCount++;
+
+ // local clustering coefficient is only defined on vertices with
+ // at least two neighbors yielding at least one pair of neighbors
+ if (record.getDegree().getValue() > 1) {
+ sumOfLocalClusteringCoefficient += record.getLocalClusteringCoefficientScore();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount));
+ getRuntimeContext().addAccumulator(id + "-1", new DoubleCounter(sumOfLocalClusteringCoefficient));
+ }
+ }
+
+ /**
+ * Wraps global clustering coefficient metrics.
+ */
+ public static class Result {
+ private long vertexCount;
+
+ private double averageLocalClusteringCoefficient;
+
+ /**
+ * Instantiate an immutable result.
+ *
+ * @param vertexCount vertex count
+ * @param sumOfLocalClusteringCoefficient sum over the vertices' local
+ * clustering coefficients
+ */
+ public Result(long vertexCount, double sumOfLocalClusteringCoefficient) {
+ this.vertexCount = vertexCount;
+ this.averageLocalClusteringCoefficient = sumOfLocalClusteringCoefficient / vertexCount;
+ }
+
+ /**
+ * Get the number of vertices.
+ *
+ * @return number of vertices
+ */
+ public long getNumberOfVertices() {
+ return vertexCount;
+ }
+
+ /**
+ * Get the average clustering coefficient.
+ *
+ * @return number of triangles
+ */
+ public double getAverageClusteringCoefficient() {
+ return averageLocalClusteringCoefficient;
+ }
+
+ @Override
+ public String toString() {
+ return "vertex count: " + vertexCount
+ + ", average clustering coefficient: " + averageLocalClusteringCoefficient;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(vertexCount)
+ .append(averageLocalClusteringCoefficient)
+ .hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) { return false; }
+ if (obj == this) { return true; }
+ if (obj.getClass() != getClass()) { return false; }
+
+ Result rhs = (Result)obj;
+
+ return new EqualsBuilder()
+ .append(vertexCount, rhs.vertexCount)
+ .append(averageLocalClusteringCoefficient, rhs.averageLocalClusteringCoefficient)
+ .isEquals();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
new file mode 100644
index 0000000..03dbc71
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient.Result;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.AbstractID;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * The average clustering coefficient measures the mean connectedness of a
+ * graph. Scores range from 0.0 (no triangles) to 1.0 (complete graph).
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class AverageClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+ private String id = new AbstractID().toString();
+
+ // Optional configuration
+ private int littleParallelism = PARALLELISM_DEFAULT;
+
+ /**
+ * Override the parallelism of operators processing small amounts of data.
+ *
+ * @param littleParallelism operator parallelism
+ * @return this
+ */
+ public AverageClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
+ this.littleParallelism = littleParallelism;
+
+ return this;
+ }
+
+ /*
+ * Implementation notes:
+ *
+ * The requirement that "K extends CopyableValue<K>" can be removed when
+ * removed from LocalClusteringCoefficient.
+ */
+
+ @Override
+ public AverageClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input)
+ throws Exception {
+ super.run(input);
+
+ DataSet<LocalClusteringCoefficient.Result<K>> localClusteringCoefficient = input
+ .run(new LocalClusteringCoefficient<K, VV, EV>()
+ .setLittleParallelism(littleParallelism));
+
+ localClusteringCoefficient
+ .output(new AverageClusteringCoefficientHelper<K>(id))
+ .name("Average clustering coefficient");
+
+ return this;
+ }
+
+ @Override
+ public Result getResult() {
+ JobExecutionResult res = env.getLastJobExecutionResult();
+
+ long vertexCount = res.getAccumulatorResult(id + "-0");
+ double sumOfLocalClusteringCoefficient = res.getAccumulatorResult(id + "-1");
+
+ return new Result(vertexCount, sumOfLocalClusteringCoefficient);
+ }
+
+ /**
+ * Helper class to collect the average clustering coefficient.
+ *
+ * @param <T> ID type
+ */
+ private static class AverageClusteringCoefficientHelper<T>
+ extends RichOutputFormat<LocalClusteringCoefficient.Result<T>> {
+ private final String id;
+
+ private long vertexCount;
+ private double sumOfLocalClusteringCoefficient;
+
+ /**
+ * The unique id is required because Flink's accumulator namespace is
+ * shared among all operators.
+ *
+ * @param id unique string used for accumulator names
+ */
+ public AverageClusteringCoefficientHelper(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {}
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {}
+
+ @Override
+ public void writeRecord(LocalClusteringCoefficient.Result<T> record) throws IOException {
+ vertexCount++;
+
+ // local clustering coefficient is only defined on vertices with
+ // at least two neighbors yielding at least one pair of neighbors
+ if (record.getDegree().getValue() > 1) {
+ sumOfLocalClusteringCoefficient += record.getLocalClusteringCoefficientScore();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount));
+ getRuntimeContext().addAccumulator(id + "-1", new DoubleCounter(sumOfLocalClusteringCoefficient));
+ }
+ }
+
+ /**
+ * Wraps global clustering coefficient metrics.
+ */
+ public static class Result {
+ private long vertexCount;
+
+ private double averageLocalClusteringCoefficient;
+
+ /**
+ * Instantiate an immutable result.
+ *
+ * @param vertexCount vertex count
+ * @param sumOfLocalClusteringCoefficient sum over the vertices' local
+ * clustering coefficients
+ */
+ public Result(long vertexCount, double sumOfLocalClusteringCoefficient) {
+ this.vertexCount = vertexCount;
+ this.averageLocalClusteringCoefficient = sumOfLocalClusteringCoefficient / vertexCount;
+ }
+
+ /**
+ * Get the number of vertices.
+ *
+ * @return number of vertices
+ */
+ public long getNumberOfVertices() {
+ return vertexCount;
+ }
+
+ /**
+ * Get the average clustering coefficient.
+ *
+ * @return number of triangles
+ */
+ public double getAverageClusteringCoefficient() {
+ return averageLocalClusteringCoefficient;
+ }
+
+ @Override
+ public String toString() {
+ return "vertex count: " + vertexCount
+ + ", average clustering coefficient: " + averageLocalClusteringCoefficient;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(vertexCount)
+ .append(averageLocalClusteringCoefficient)
+ .hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) { return false; }
+ if (obj == this) { return true; }
+ if (obj.getClass() != getClass()) { return false; }
+
+ Result rhs = (Result)obj;
+
+ return new EqualsBuilder()
+ .append(vertexCount, rhs.vertexCount)
+ .append(averageLocalClusteringCoefficient, rhs.averageLocalClusteringCoefficient)
+ .isEquals();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index 28e2669..8ef87a5 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -48,6 +48,8 @@ public class AsmTestBase {
protected Graph<LongValue,NullValue,NullValue> completeGraph;
// empty graph
+ protected final long emptyGraphVertexCount = 3;
+
protected Graph<LongValue,NullValue,NullValue> emptyGraph;
// RMat graph
@@ -86,7 +88,7 @@ public class AsmTestBase {
.generate();
// empty graph
- emptyGraph = new EmptyGraph(env, 3)
+ emptyGraph = new EmptyGraph(env, emptyGraphVertexCount)
.generate();
// RMat graph
http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficientTest.java
new file mode 100644
index 0000000..9de9bac
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficientTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.directed;
+
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class AverageClusteringCoefficientTest
+extends AsmTestBase {
+
+ @Test
+ public void testWithSimpleGraph()
+ throws Exception {
+ // see results in LocalClusteringCoefficientTest.testSimpleGraph
+ Result expectedResult = new Result(6, 1.0/2 + 2.0/6 + 2.0/6 + 1.0/12);
+
+ Result averageClusteringCoefficient = new AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+ .run(directedSimpleGraph)
+ .execute();
+
+ assertEquals(expectedResult, averageClusteringCoefficient);
+ }
+
+ @Test
+ public void testWithCompleteGraph()
+ throws Exception {
+ Result expectedResult = new Result(completeGraphVertexCount, completeGraphVertexCount);
+
+ Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .run(completeGraph)
+ .execute();
+
+ assertEquals(expectedResult, averageClusteringCoefficient);
+ }
+
+ @Test
+ public void testWithEmptyGraph()
+ throws Exception {
+ Result expectedResult = new Result(emptyGraphVertexCount, 0);
+
+ Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .run(emptyGraph)
+ .execute();
+
+ assertEquals(expectedResult, averageClusteringCoefficient);
+ }
+
+ @Test
+ public void testWithRMatGraph()
+ throws Exception {
+ Result expectedResult = new Result(902, 297.152607);
+
+ Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .run(directedRMatGraph)
+ .execute();
+
+ assertEquals(expectedResult.getNumberOfVertices(), averageClusteringCoefficient.getNumberOfVertices());
+ assertEquals(expectedResult.getAverageClusteringCoefficient(), averageClusteringCoefficient.getAverageClusteringCoefficient(), 0.000001);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficientTest.java
new file mode 100644
index 0000000..34fda17
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficientTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class AverageClusteringCoefficientTest
+extends AsmTestBase {
+
+ @Test
+ public void testWithSimpleGraph()
+ throws Exception {
+ // see results in LocalClusteringCoefficientTest.testSimpleGraph
+ Result expectedResult = new Result(6, 1.0/1 + 2.0/3 + 2.0/3 + 1.0/6);
+
+ Result averageClusteringCoefficient = new AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+ .run(undirectedSimpleGraph)
+ .execute();
+
+ assertEquals(expectedResult, averageClusteringCoefficient);
+ }
+
+ @Test
+ public void testWithCompleteGraph()
+ throws Exception {
+ Result expectedResult = new Result(completeGraphVertexCount, completeGraphVertexCount);
+
+ Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .run(completeGraph)
+ .execute();
+
+ assertEquals(expectedResult, averageClusteringCoefficient);
+ }
+
+ @Test
+ public void testWithEmptyGraph()
+ throws Exception {
+ Result expectedResult = new Result(emptyGraphVertexCount, 0);
+
+ Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .run(emptyGraph)
+ .execute();
+
+ assertEquals(expectedResult, averageClusteringCoefficient);
+ }
+
+ @Test
+ public void testWithRMatGraph()
+ throws Exception {
+ Result expectedResult = new Result(902, 380.40109);
+
+ Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .run(undirectedRMatGraph)
+ .execute();
+
+ assertEquals(expectedResult.getNumberOfVertices(), averageClusteringCoefficient.getNumberOfVertices());
+ assertEquals(expectedResult.getAverageClusteringCoefficient(), averageClusteringCoefficient.getAverageClusteringCoefficient(), 0.000001);
+ }
+}