You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/06/07 17:48:02 UTC
flink git commit: [FLINK-3906] [gelly] Global Clustering Coefficient
Repository: flink
Updated Branches:
refs/heads/master 7160a6812 -> 8da1a75ce
[FLINK-3906] [gelly] Global Clustering Coefficient
The global clustering coefficient measures the connectedness of a graph.
Scores range from 0.0 (no triangles) to 1.0 (complete graph).
This closes #1997
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/8da1a75c
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/8da1a75c
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/8da1a75c
Branch: refs/heads/master
Commit: 8da1a75ceb30ef1bce27a2426dab3a0f66b94b64
Parents: 7160a68
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue May 17 10:02:47 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jun 7 12:26:15 2016 -0400
----------------------------------------------------------------------
docs/apis/batch/libs/gelly.md | 38 ++-
.../graph/library/TriangleCountITCase.java | 53 ----
.../org/apache/flink/graph/scala/Graph.scala | 18 +-
.../flink/graph/AbstractGraphAnalytic.java | 62 +++++
.../main/java/org/apache/flink/graph/Graph.java | 32 ++-
.../org/apache/flink/graph/GraphAnalytic.java | 74 ++++++
.../flink/graph/library/GSATriangleCount.java | 192 --------------
.../undirected/GlobalClusteringCoefficient.java | 155 ++++++++++++
.../undirected/LocalClusteringCoefficient.java | 2 +-
.../clustering/undirected/TriangleCount.java | 78 ++++++
.../clustering/undirected/TriangleListing.java | 4 +-
.../metric/undirected/VertexMetrics.java | 251 +++++++++++++++++++
.../GlobalClusteringCoefficientTest.java | 84 +++++++
.../LocalClusteringCoefficientTest.java | 8 +-
.../undirected/TriangleCountTest.java | 75 ++++++
.../metric/undirected/VertexMetricsTest.java | 97 +++++++
16 files changed, 954 insertions(+), 269 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index 7adff04..45fdbe5 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -1828,12 +1828,13 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc
* [GSA PageRank](#gsa-pagerank)
* [Single Source Shortest Paths](#single-source-shortest-paths)
* [GSA Single Source Shortest Paths](#gsa-single-source-shortest-paths)
-* [GSA Triangle Count](#gsa-triangle-count)
+* [Triangle Count](#triangle-count)
* [Triangle Enumerator](#triangle-enumerator)
* [Hyperlink-Induced Topic Search](#hyperlink-induced-topic-search)
* [Summarization](#summarization)
* [Jaccard Index](#jaccard-index)
* [Local Clustering Coefficient](#local-clustering-coefficient)
+* [Global Clustering Coefficient](#global-clustering-coefficient)
Gelly's library methods can be used by simply calling the `run()` method on the input graph:
@@ -1997,19 +1998,23 @@ The algorithm is implemented using [gather-sum-apply iterations](#gather-sum-app
See the [Single Source Shortest Paths](#single-source-shortest-paths) library method for implementation details and usage information.
-### GSA Triangle Count
+### Triangle Count
#### Overview
-An implementation of the Triangle Count algorithm. Given an input graph, it returns the number of unique triangles in it.
+An analytic for counting the number of unique triangles in a graph.
#### Details
-This algorithm operates in three phases. First, vertices select neighbors with IDs greater than theirs
-and send messages to them. Each received message is then propagated to neighbors with higher IDs.
-Finally, if a node encounters the target ID in the list of received messages, it increments the number of discovered triangles.
+Counts the triangles generated by [Triangle Listing](#triangle-listing).
#### Usage
-The algorithm takes an undirected, unweighted graph as input and outputs a `DataSet` which contains a single integer corresponding to the number of triangles
-in the graph. The algorithm constructor takes no arguments.
+The analytic takes an undirected graph as input and returns as a result a `Long` corresponding to the number of triangles
+in the graph. The graph ID type must be `Comparable` and `Copyable`.
+
+### Triangle Listing
+
+This algorithm supports object reuse. The graph ID type must be `Comparable` and `Copyable`.
+
+See the [Triangle Enumerator](#triangle-enumerator) library method for implementation details.
### Triangle Enumerator
@@ -2108,7 +2113,22 @@ See the [Triangle Enumeration](#triangle-enumeration) library method for a detai
#### Usage
The algorithm takes a simple, undirected graph as input and outputs a `DataSet` of tuples containing the vertex ID,
-vertex degree, and number of triangles containing the vertex. The vertex ID must be `Comparable` and `Copyable`.
+vertex degree, and number of triangles containing the vertex. The graph ID type must be `Comparable` and `Copyable`.
+
+### Global Clustering Coefficient
+
+#### Overview
+The global clustering coefficient measures the connectedness of a graph. Scores range from 0.0 (no edges between
+neighbors) to 1.0 (complete graph).
+
+#### Details
+See the [Local Clustering Coefficient](#local-clustering-coefficient) library method for a detailed explanation of
+clustering coefficient.
+
+#### Usage
+The algorithm takes a simple, undirected graph as input and outputs a result containing the total number of triplets and
+triangles in the graph. The graph ID type must be `Comparable` and `Copyable`.
+
{% top %}
http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java
deleted file mode 100644
index aaada8f..0000000
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/TriangleCountITCase.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.examples.data.TriangleCountData;
-import org.apache.flink.test.util.MultipleProgramsTestBase;
-import org.apache.flink.types.NullValue;
-import org.junit.Assert;
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.util.List;
-
-@RunWith(Parameterized.class)
-public class TriangleCountITCase extends MultipleProgramsTestBase {
-
- public TriangleCountITCase(TestExecutionMode mode) {
- super(mode);
- }
-
- @Test
- public void testGSATriangleCount() throws Exception {
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
- env).getUndirected();
-
- List<Integer> numberOfTriangles = graph.run(new GSATriangleCount<Long, NullValue, NullValue>()).collect();
- String expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES;
-
- Assert.assertEquals(numberOfTriangles.get(0).intValue(), Integer.parseInt(expectedResult));
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index f31619d..3881aae 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -1103,19 +1103,35 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
* @return a Dataset of Tuple2, with one tuple per vertex.
* The first field of the Tuple2 is the vertex ID and the second field
* is the aggregate value computed by the provided [[ReduceNeighborsFunction]].
- */
+ */
def reduceOnEdges(reduceEdgesFunction: ReduceEdgesFunction[EV], direction: EdgeDirection):
DataSet[(K, EV)] = {
wrap(jgraph.reduceOnEdges(reduceEdgesFunction, direction)).map(jtuple => (jtuple.f0,
jtuple.f1))
}
+ /**
+ * @param algorithm the algorithm to run on the Graph
+ * @return the result of the graph algorithm
+ */
def run[T: TypeInformation : ClassTag](algorithm: GraphAlgorithm[K, VV, EV, T]):
T = {
jgraph.run(algorithm)
}
/**
+ * A GraphAnalytic is similar to a GraphAlgorithm but is terminal and results
+ * are retrieved via accumulators. A Flink program has a single point of
+ * execution. A GraphAnalytic defers execution to the user to allow composing
+ * multiple analytics and algorithms into a single program.
+ *
+ * @param analytic the analytic to run on the Graph
+ */
+ def run[T: TypeInformation : ClassTag](analytic: GraphAnalytic[K, VV, EV, T])= {
+ jgraph.run(analytic)
+ }
+
+ /**
* Runs a scatter-gather iteration on the graph.
* No configuration options are provided.
*
http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
new file mode 100644
index 0000000..b13e82e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/AbstractGraphAnalytic.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
+
+/**
+ * Base class for {@link GraphAnalytic}.
+ *
+ * @param <K> key type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ * @param <T> the return type
+ */
+public abstract class AbstractGraphAnalytic<K, VV, EV, T>
+implements GraphAnalytic<K, VV, EV, T> {
+
+ protected ExecutionEnvironment env;
+
+ @Override
+ public GraphAnalytic<K, VV, EV, T> run(Graph<K, VV, EV> input)
+ throws Exception {
+ env = input.getContext();
+ return null;
+ }
+
+ @Override
+ public T execute()
+ throws Exception {
+ Preconditions.checkNotNull(env);
+
+ env.execute();
+ return getResult();
+ }
+
+ @Override
+ public T execute(String jobName)
+ throws Exception {
+ Preconditions.checkNotNull(jobName);
+ Preconditions.checkNotNull(env);
+
+ env.execute(jobName);
+ return getResult();
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index b17f7a5..dd25cfd 100755
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -18,20 +18,13 @@
package org.apache.flink.graph;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.List;
-import java.util.Arrays;
-
import org.apache.flink.api.common.functions.CoGroupFunction;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
@@ -71,6 +64,13 @@ import org.apache.flink.graph.validation.GraphValidator;
import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+
/**
* Represents a Graph consisting of {@link Edge edges} and {@link Vertex
* vertices}.
@@ -1788,6 +1788,20 @@ public class Graph<K, VV, EV> {
}
/**
+ * A {@code GraphAnalytic} is similar to a {@link GraphAlgorithm} but is terminal
+ * and results are retrieved via accumulators. A Flink program has a single
+ * point of execution. A {@code GraphAnalytic} defers execution to the user to
+ * allow composing multiple analytics and algorithms into a single program.
+ *
+ * @param analytic the analytic to run on the Graph
+ * @param <T> the result type
+ * @throws Exception
+ */
+ public <T> void run(GraphAnalytic<K, VV, EV, T> analytic) throws Exception {
+ analytic.run(this);
+ }
+
+ /**
* Groups by vertex and computes a GroupReduce transformation over the neighbors (both edges and vertices)
* of each vertex. The neighborsFunction applied on the neighbors only has access to both the vertex id
* and the vertex value of the grouping vertex.
http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
new file mode 100644
index 0000000..dd221dc
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalytic.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.io.OutputFormat;
+import org.apache.flink.api.java.DataSet;
+
+/**
+ * A {@code GraphAnalytic} is similar to a {@link GraphAlgorithm} but is terminal
+ * and results are retrieved via accumulators. A Flink program has a single
+ * point of execution. A {@code GraphAnalytic} defers execution to the user to
+ * allow composing multiple analytics and algorithms into a single program.
+ *
+ * @param <K> key type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ * @param <T> the return type
+ */
+public interface GraphAnalytic<K, VV, EV, T> {
+
+ /**
+ * This method must be called after the program has executed:
+ * 1) "run" analytics and algorithms
+ * 2) call ExecutionEnvironment.execute()
+ * 3) get analytics results
+ *
+ * @return the result
+ */
+ T getResult();
+
+ /**
+ * Execute the program and return the result.
+ *
+ * @return the result
+ * @throws Exception
+ */
+ T execute() throws Exception;
+
+ /**
+ * Execute the program and return the result.
+ *
+ * @param jobName the name to assign to the job
+ * @return the result
+ * @throws Exception
+ */
+ T execute(String jobName) throws Exception;
+
+ /**
+ * All {@code GraphAnalytic} processing must be terminated by an
+ * {@link OutputFormat}. Rather than obtained via accumulators rather than
+ * returned by a {@link DataSet}.
+ *
+ * @param input input graph
+ * @return this
+ * @throws Exception
+ */
+ GraphAnalytic<K, VV, EV, T> run(Graph<K, VV, EV> input) throws Exception;
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
deleted file mode 100644
index 1eafce2..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
+++ /dev/null
@@ -1,192 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.ReduceNeighborsFunction;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Triplet;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.VertexJoinFunction;
-import org.apache.flink.types.NullValue;
-
-import java.util.TreeMap;
-
-/**
- * Triangle Count Algorithm.
- *
- * This algorithm operates in three phases. First, vertices select neighbors with id greater than theirs
- * and send messages to them. Each received message is then propagated to neighbors with higher id.
- * Finally, if a node encounters the target id in the list of received messages, it increments the number
- * of triangles found.
- *
- * This implementation is non - iterative.
- *
- * The algorithm takes an undirected, unweighted graph as input and outputs a DataSet
- * which contains a single integer representing the number of triangles.
- */
-public class GSATriangleCount<K extends Comparable<K>, VV, EV> implements
- GraphAlgorithm<K, VV, EV, DataSet<Integer>> {
-
- @SuppressWarnings("serial")
- @Override
- public DataSet<Integer> run(Graph<K, VV, EV> input) throws Exception {
-
- ExecutionEnvironment env = input.getContext();
-
- // order the edges so that src is always higher than trg
- DataSet<Edge<K, NullValue>> edges = input.getEdges().map(new OrderEdges<K, EV>()).distinct();
-
- Graph<K, TreeMap<K, Integer>, NullValue> graph = Graph.fromDataSet(edges,
- new VertexInitializer<K>(), env);
-
- // select neighbors with ids higher than the current vertex id
- // Gather: a no-op in this case
- // Sum: create the set of neighbors
- DataSet<Tuple2<K, TreeMap<K, Integer>>> higherIdNeighbors =
- graph.reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
-
- Graph<K, TreeMap<K, Integer>, NullValue> graphWithReinitializedVertexValues =
- graph.mapVertices(new VertexInitializerEmptyTreeMap<K>());
-
- // Apply: attach the computed values to the vertices
- // joinWithVertices to update the node values
- DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithHigherIdNeighbors =
- graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new AttachValues<K>()).getVertices();
-
- Graph<K, TreeMap<K,Integer>, NullValue> graphWithNeighbors = Graph.fromDataSet(verticesWithHigherIdNeighbors,
- edges, env);
-
- // propagate each received value to neighbors with higher id
- // Gather: a no-op in this case
- // Sum: propagate values
- DataSet<Tuple2<K, TreeMap<K, Integer>>> propagatedValues = graphWithNeighbors
- .reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
-
- // Apply: attach propagated values to vertices
- DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithPropagatedValues =
- graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new AttachValues<K>()).getVertices();
-
- Graph<K, TreeMap<K, Integer>, NullValue> graphWithPropagatedNeighbors =
- Graph.fromDataSet(verticesWithPropagatedValues, graphWithNeighbors.getEdges(), env);
-
- // Scatter: compute the number of triangles
- DataSet<Integer> numberOfTriangles = graphWithPropagatedNeighbors.getTriplets()
- .map(new ComputeTriangles<K>()).reduce(new ReduceFunction<Integer>() {
-
- @Override
- public Integer reduce(Integer first, Integer second) throws Exception {
- return first + second;
- }
- });
-
- return numberOfTriangles;
- }
-
- @SuppressWarnings("serial")
- private static final class OrderEdges<K extends Comparable<K>, EV> implements
- MapFunction<Edge<K, EV>, Edge<K, NullValue>> {
-
- @Override
- public Edge<K, NullValue> map(Edge<K, EV> edge) throws Exception {
- if (edge.getSource().compareTo(edge.getTarget()) < 0) {
- return new Edge<K, NullValue>(edge.getTarget(), edge.getSource(), NullValue.getInstance());
- } else {
- return new Edge<K, NullValue>(edge.getSource(), edge.getTarget(), NullValue.getInstance());
- }
- }
- }
-
- @SuppressWarnings("serial")
- private static final class VertexInitializer<K> implements MapFunction<K, TreeMap<K, Integer>> {
-
- @Override
- public TreeMap<K, Integer> map(K value) throws Exception {
- TreeMap<K, Integer> neighbors = new TreeMap<K, Integer>();
- neighbors.put(value, 1);
-
- return neighbors;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class VertexInitializerEmptyTreeMap<K> implements
- MapFunction<Vertex<K, TreeMap<K, Integer>>, TreeMap<K, Integer>> {
-
- @Override
- public TreeMap<K, Integer> map(Vertex<K, TreeMap<K, Integer>> vertex) throws Exception {
- return new TreeMap<K, Integer>();
- }
- }
-
- @SuppressWarnings("serial")
- private static final class AttachValues<K> implements VertexJoinFunction<TreeMap<K, Integer>,
- TreeMap<K, Integer>> {
-
- @Override
- public TreeMap<K, Integer> vertexJoin(TreeMap<K, Integer> vertexValue,
- TreeMap<K, Integer> inputValue) {
- return inputValue;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class GatherHigherIdNeighbors<K> implements
- ReduceNeighborsFunction<TreeMap<K,Integer>> {
-
- @Override
- public TreeMap<K, Integer> reduceNeighbors(TreeMap<K,Integer> first, TreeMap<K,Integer> second) {
- for (K key : second.keySet()) {
- Integer value = first.get(key);
- if (value != null) {
- first.put(key, value + second.get(key));
- } else {
- first.put(key, second.get(key));
- }
- }
- return first;
- }
- }
-
- @SuppressWarnings("serial")
- private static final class ComputeTriangles<K> implements MapFunction<Triplet<K, TreeMap<K, Integer>, NullValue>,
- Integer> {
-
- @Override
- public Integer map(Triplet<K, TreeMap<K, Integer>, NullValue> triplet) throws Exception {
-
- Vertex<K, TreeMap<K, Integer>> srcVertex = triplet.getSrcVertex();
- Vertex<K, TreeMap<K, Integer>> trgVertex = triplet.getTrgVertex();
- int triangles = 0;
-
- if(trgVertex.getValue().get(srcVertex.getId()) != null) {
- triangles = trgVertex.getValue().get(srcVertex.getId());
- }
- return triangles;
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
new file mode 100644
index 0000000..fc89e43
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient.Result;
+import org.apache.flink.graph.library.metric.undirected.VertexMetrics;
+import org.apache.flink.types.CopyableValue;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * The global clustering coefficient measures the connectedness of a graph.
+ * Scores range from 0.0 (no triangles) to 1.0 (complete graph).
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class GlobalClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+ private TriangleCount<K, VV, EV> triangleCount;
+
+ private VertexMetrics<K, VV, EV> vertexMetrics;
+
+ // Optional configuration
+ private int littleParallelism = PARALLELISM_DEFAULT;
+
+ /**
+ * Override the parallelism of operators processing small amounts of data.
+ *
+ * @param littleParallelism operator parallelism
+ * @return this
+ */
+ public GlobalClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
+ this.littleParallelism = littleParallelism;
+
+ return this;
+ }
+
+ @Override
+ public GlobalClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input)
+ throws Exception {
+ super.run(input);
+
+ triangleCount = new TriangleCount<K, VV, EV>()
+ .setLittleParallelism(littleParallelism);
+
+ input.run(triangleCount);
+
+ vertexMetrics = new VertexMetrics<K, VV, EV>()
+ .setParallelism(littleParallelism);
+
+ input.run(vertexMetrics);
+
+ return this;
+ }
+
+ @Override
+ public Result getResult() {
+ return new Result(vertexMetrics.getResult().getNumberOfTriplets(), 3 * triangleCount.getResult());
+ }
+
+ /**
+ * Wraps global clustering coefficient metrics.
+ */
+ public static class Result {
+ private long tripletCount;
+ private long triangleCount;
+
+ public Result(long tripletCount, long triangleCount) {
+ this.tripletCount = tripletCount;
+ this.triangleCount = triangleCount;
+ }
+
+ /**
+ * Get the number of triplets.
+ *
+ * @return number of triplets
+ */
+ public long getNumberOfTriplets() {
+ return tripletCount;
+ }
+
+ /**
+ * Get the number of triangles.
+ *
+ * @return number of triangles
+ */
+ public long getNumberOfTriangles() {
+ return triangleCount;
+ }
+
+ /**
+ * Get the global clustering coefficient score. This is computed as the
+ * number of closed triplets (triangles) divided by the total number of
+ * triplets.
+ *
+ * A score of {@code Double.NaN} is returned for a graph of isolated vertices
+ * for which both the triangle count and number of neighbors are zero.
+ *
+ * @return global clustering coefficient score
+ */
+ public double getLocalClusteringCoefficientScore() {
+ return (tripletCount == 0) ? Double.NaN : triangleCount / (double)tripletCount;
+ }
+
+ @Override
+ public String toString() {
+ return "triplet count: " + tripletCount + ", triangle count:" + triangleCount;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(tripletCount)
+ .append(triangleCount)
+ .hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) { return false; }
+ if (obj == this) { return true; }
+ if (obj.getClass() != getClass()) { return false; }
+
+ Result rhs = (Result)obj;
+
+ return new EqualsBuilder()
+ .append(tripletCount, rhs.tripletCount)
+ .append(triangleCount, rhs.triangleCount)
+ .isEquals();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index d1618d1..bc62d36 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -185,7 +185,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
}
/**
- * Wraps the vertex type to encapsulate results from the Clustering Coefficient algorithm.
+ * Wraps the vertex type to encapsulate results from the local clustering coefficient algorithm.
*
* @param <T> ID type
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java
new file mode 100644
index 0000000..bc43725
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleCount.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.CountHelper;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.AbstractID;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Count the number of distinct triangles in an undirected graph.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ * @see TriangleListing
+ */
+public class TriangleCount<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Long> {
+
+ private String id = new AbstractID().toString();
+
+ // Optional configuration
+ private int littleParallelism = PARALLELISM_DEFAULT;
+
+ /**
+ * Override the parallelism of operators processing small amounts of data.
+ *
+ * @param littleParallelism operator parallelism
+ * @return this
+ */
+ public TriangleCount<K, VV, EV> setLittleParallelism(int littleParallelism) {
+ this.littleParallelism = littleParallelism;
+
+ return this;
+ }
+
+ @Override
+ public TriangleCount<K, VV, EV> run(Graph<K, VV, EV> input)
+ throws Exception {
+ super.run(input);
+
+ DataSet<Tuple3<K, K, K>> triangles = input
+ .run(new TriangleListing<K, VV, EV>()
+ .setSortTriangleVertices(false)
+ .setLittleParallelism(littleParallelism));
+
+ triangles.output(new CountHelper<Tuple3<K, K, K>>(id)).name("Count triangles");
+
+ return this;
+ }
+
+ @Override
+ public Long getResult() {
+ return env.getLastJobExecutionResult().<Long> getAccumulatorResult(id);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index 1319d02..6245433 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -286,11 +286,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
@Override
public Tuple3<T, T, T> map(Tuple3<T, T, T> value)
throws Exception {
- T temp_val;
-
// by the triangle listing algorithm we know f1 < f2
if (value.f0.compareTo(value.f1) > 0) {
- temp_val = value.f0;
+ T temp_val = value.f0;
value.f0 = value.f1;
if (temp_val.compareTo(value.f2) <= 0) {
http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
new file mode 100644
index 0000000..41ae27a
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -0,0 +1,251 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.metric.undirected;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
+import org.apache.flink.graph.library.metric.undirected.VertexMetrics.Result;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.AbstractID;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Compute the number of vertices, number of edges, and number of triplets in
+ * an undirected graph.
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class VertexMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+ private String id = new AbstractID().toString();
+
+ // Optional configuration
+ private boolean includeZeroDegreeVertices = false;
+
+ private boolean reduceOnTargetId = false;
+
+ private int parallelism = PARALLELISM_DEFAULT;
+
+ /**
+ * By default only the edge set is processed for the computation of degree.
+ * When this flag is set an additional join is performed against the vertex
+ * set in order to output vertices with a degree of zero.
+ *
+ * @param includeZeroDegreeVertices whether to output vertices with a
+ * degree of zero
+ * @return this
+ */
+ public VertexMetrics<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
+ this.includeZeroDegreeVertices = includeZeroDegreeVertices;
+
+ return this;
+ }
+
+ /**
+ * The degree can be counted from either the edge source or target IDs.
+ * By default the source IDs are counted. Reducing on target IDs may
+ * optimize the algorithm if the input edge list is sorted by target ID.
+ *
+ * @param reduceOnTargetId set to {@code true} if the input edge list
+ * is sorted by target ID
+ * @return this
+ */
+ public VertexMetrics<K, VV, EV> setReduceOnTargetId(boolean reduceOnTargetId) {
+ this.reduceOnTargetId = reduceOnTargetId;
+
+ return this;
+ }
+
+ /**
+ * Override the operator parallelism.
+ *
+ * @param parallelism operator parallelism
+ * @return this
+ */
+ public VertexMetrics<K, VV, EV> setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+
+ return this;
+ }
+
+ @Override
+ public VertexMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
+ throws Exception {
+ super.run(input);
+
+ DataSet<Vertex<K, LongValue>> vertexDegree = input
+ .run(new VertexDegree<K, VV, EV>()
+ .setIncludeZeroDegreeVertices(includeZeroDegreeVertices)
+ .setReduceOnTargetId(reduceOnTargetId)
+ .setParallelism(parallelism));
+
+ vertexDegree.output(new VertexMetricsHelper<K>(id)).name("Vertex metrics");
+
+ return this;
+ }
+
+ @Override
+ public Result getResult() {
+ JobExecutionResult res = env.getLastJobExecutionResult();
+
+ long vertexCount = res.getAccumulatorResult(id + "-0");
+ long edgeCount = res.getAccumulatorResult(id + "-1");
+ long tripletCount = res.getAccumulatorResult(id + "-2");
+
+ return new Result(vertexCount, edgeCount / 2, tripletCount);
+ }
+
+ /**
+ * Helper class to collect vertex metrics.
+ *
+ * @param <T> ID type
+ */
+ private static class VertexMetricsHelper<T>
+ extends RichOutputFormat<Vertex<T, LongValue>> {
+ private final String id;
+
+ private long vertexCount;
+ private long edgeCount;
+ private long tripletCount;
+
+ /**
+ * This helper class collects vertex metrics by scanning over and
+ * discarding elements from the given DataSet.
+ *
+ * The unique id is required because Flink's accumulator namespace is
+ * among all operators.
+ *
+ * @param id unique string used for accumulator names
+ */
+ public VertexMetricsHelper(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {}
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {}
+
+ @Override
+ public void writeRecord(Vertex<T, LongValue> record) throws IOException {
+ long degree = record.f1.getValue();
+
+ vertexCount++;
+ edgeCount += degree;
+ tripletCount += degree * (degree - 1) / 2;
+ }
+
+ @Override
+ public void close() throws IOException {
+ getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount));
+ getRuntimeContext().addAccumulator(id + "-1", new LongCounter(edgeCount));
+ getRuntimeContext().addAccumulator(id + "-2", new LongCounter(tripletCount));
+ }
+ }
+
+ /**
+ * Wraps vertex metrics.
+ */
+ public static class Result {
+ private long vertexCount;
+ private long edgeCount;
+ private long tripletCount;
+
+ public Result(long vertexCount, long edgeCount, long tripletCount) {
+ this.vertexCount = vertexCount;
+ this.edgeCount = edgeCount;
+ this.tripletCount = tripletCount;
+ }
+
+ /**
+ * Get the number of vertices.
+ *
+ * @return number of vertices
+ */
+ public long getNumberOfVertices() {
+ return vertexCount;
+ }
+
+ /**
+ * Get the number of edges.
+ *
+ * @return number of edges
+ */
+ public long getNumberOfEdges() {
+ return edgeCount;
+ }
+
+ /**
+ * Get the number of triplets.
+ *
+ * @return number of triplets
+ */
+ public long getNumberOfTriplets() {
+ return tripletCount;
+ }
+
+ @Override
+ public String toString() {
+ return "vertex count: " + vertexCount
+ + ", edge count:" + edgeCount
+ + ", triplet count: " + tripletCount;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(vertexCount)
+ .append(edgeCount)
+ .append(tripletCount)
+ .hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) { return false; }
+ if (obj == this) { return true; }
+ if (obj.getClass() != getClass()) { return false; }
+
+ Result rhs = (Result)obj;
+
+ return new EqualsBuilder()
+ .append(vertexCount, rhs.vertexCount)
+ .append(edgeCount, rhs.edgeCount)
+ .append(tripletCount, rhs.tripletCount)
+ .isEquals();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java
new file mode 100644
index 0000000..71ec2a6
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficientTest.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class GlobalClusteringCoefficientTest
+extends AsmTestBase {
+
+ @Test
+ public void testWithSimpleGraph()
+ throws Exception {
+ Result expectedResult = new Result(13, 6);
+
+ Result globalClusteringCoefficient = new GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
+ .run(undirectedSimpleGraph)
+ .execute();
+
+ assertEquals(expectedResult, globalClusteringCoefficient);
+ }
+
+ @Test
+ public void testWithCompleteGraph()
+ throws Exception {
+ long expectedDegree = completeGraphVertexCount - 1;
+ long expectedCount = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+
+ Result expectedResult = new Result(expectedCount, expectedCount);
+
+ Result globalClusteringCoefficient = new GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .run(completeGraph)
+ .execute();
+
+ assertEquals(expectedResult, globalClusteringCoefficient);
+ }
+
+ @Test
+ public void testWithEmptyGraph()
+ throws Exception {
+ Result expectedResult = new Result(0, 0);
+
+ Result globalClusteringCoefficient = new GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .run(emptyGraph)
+ .execute();
+
+ assertEquals(expectedResult, globalClusteringCoefficient);
+ }
+
+ @Test
+ public void testWithRMatGraph()
+ throws Exception {
+ Result expectedResult = new Result(1003442, 225147);
+
+ Result globalClusteringCoefficient = new GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .run(undirectedRMatGraph)
+ .execute();
+
+ assertEquals(expectedResult, globalClusteringCoefficient);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
index 414f200..3455df4 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficientTest.java
@@ -30,6 +30,8 @@ import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.junit.Test;
+import java.util.List;
+
import static org.junit.Assert.assertEquals;
public class LocalClusteringCoefficientTest
@@ -61,7 +63,11 @@ extends AsmTestBase {
DataSet<Result<LongValue>> cc = completeGraph
.run(new LocalClusteringCoefficient<LongValue, NullValue, NullValue>());
- for (Result<LongValue> result : cc.collect()) {
+ List<Result<LongValue>> results = cc.collect();
+
+ assertEquals(completeGraphVertexCount, results.size());
+
+ for (Result<LongValue> result : results) {
assertEquals(expectedDegree, result.getDegree().getValue());
assertEquals(expectedTriangleCount, result.getTriangleCount().getValue());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java
new file mode 100644
index 0000000..6bf9b0d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriangleCountTest.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TriangleCountTest
+extends AsmTestBase {
+
+ @Test
+ public void testWithSimpleGraph()
+ throws Exception {
+ long triangleCount = new TriangleCount<IntValue, NullValue, NullValue>()
+ .run(undirectedSimpleGraph)
+ .execute();
+
+ assertEquals(2, triangleCount);
+ }
+
+ @Test
+ public void testWithCompleteGraph()
+ throws Exception {
+ long expectedDegree = completeGraphVertexCount - 1;
+ long expectedCount = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2) / 3;
+
+ long triangleCount = new TriangleCount<LongValue, NullValue, NullValue>()
+ .run(completeGraph)
+ .execute();
+
+ assertEquals(expectedCount, triangleCount);
+ }
+
+ @Test
+ public void testWithEmptyGraph()
+ throws Exception {
+ long triangleCount = new TriangleCount<LongValue, NullValue, NullValue>()
+ .run(emptyGraph)
+ .execute();
+
+ assertEquals(0, triangleCount);
+ }
+
+ @Test
+ public void testWithRMatGraph()
+ throws Exception {
+ long triangleCount = new TriangleCount<LongValue, NullValue, NullValue>()
+ .run(undirectedRMatGraph)
+ .execute();
+
+ assertEquals(75049, triangleCount);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/8da1a75c/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
new file mode 100644
index 0000000..a36ca94
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/metric/undirected/VertexMetricsTest.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.metric.undirected;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.metric.undirected.VertexMetrics.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class VertexMetricsTest
+extends AsmTestBase {
+
+ @Test
+ public void testWithSimpleGraph()
+ throws Exception {
+ Result expectedResult = new Result(6, 7, 13);
+
+ Result vertexMetrics = new VertexMetrics<IntValue, NullValue, NullValue>()
+ .run(undirectedSimpleGraph)
+ .execute();
+
+ assertEquals(expectedResult, vertexMetrics);
+ }
+
+ @Test
+ public void testWithCompleteGraph()
+ throws Exception {
+ long expectedDegree = completeGraphVertexCount - 1;
+ long expectedEdges = completeGraphVertexCount * expectedDegree / 2;
+ long expectedTriplets = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2);
+
+ Result expectedResult = new Result(completeGraphVertexCount, expectedEdges, expectedTriplets);
+
+ Result vertexMetrics = new VertexMetrics<LongValue, NullValue, NullValue>()
+ .run(completeGraph)
+ .execute();
+
+ assertEquals(expectedResult, vertexMetrics);
+ }
+
+ @Test
+ public void testWithEmptyGraph()
+ throws Exception {
+ Result expectedResult;
+
+ expectedResult = new Result(0, 0, 0);
+
+ Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>()
+ .setIncludeZeroDegreeVertices(false)
+ .run(emptyGraph)
+ .execute();
+
+ assertEquals(withoutZeroDegreeVertices, expectedResult);
+
+ expectedResult = new Result(3, 0, 0);
+
+ Result withZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>()
+ .setIncludeZeroDegreeVertices(true)
+ .run(emptyGraph)
+ .execute();
+
+ assertEquals(expectedResult, withZeroDegreeVertices);
+ }
+
+ @Test
+ public void testWithRMatGraph()
+ throws Exception {
+ Result expectedResult = new Result(902, 10442, 1003442);
+
+ Result withoutZeroDegreeVertices = new VertexMetrics<LongValue, NullValue, NullValue>()
+ .run(undirectedRMatGraph)
+ .execute();
+
+ assertEquals(expectedResult, withoutZeroDegreeVertices);
+ }
+}