You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/11/10 17:14:13 UTC

[4/4] flink git commit: [FLINK-4934] [gelly] Triadic Census

[FLINK-4934] [gelly] Triadic Census

A triad is any three vertices in a graph. The triadic census produces
counts for each of the 4 undirected and 16 directed triad types.

This closes #2731


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/86a3dd58
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/86a3dd58
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/86a3dd58

Branch: refs/heads/master
Commit: 86a3dd586ebfcbc73e048a85a065dc579cb5c880
Parents: ed09dba
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Oct 31 10:34:33 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Nov 10 10:46:32 2016 -0500

----------------------------------------------------------------------
 docs/dev/libs/gelly/library_methods.md          | 224 ++++++--
 .../flink/graph/drivers/TriangleListing.java    |  86 ++-
 .../clustering/directed/TriadicCensus.java      | 562 +++++++++++++++++++
 .../clustering/undirected/TriadicCensus.java    | 237 ++++++++
 .../library/metric/directed/EdgeMetrics.java    |   5 +-
 .../library/metric/directed/VertexMetrics.java  |   5 +-
 .../library/metric/undirected/EdgeMetrics.java  |   5 +-
 .../metric/undirected/VertexMetrics.java        |   5 +-
 .../clustering/directed/TriadicCensusTest.java  | 123 ++++
 .../undirected/TriadicCensusTest.java           |  94 ++++
 10 files changed, 1259 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/86a3dd58/docs/dev/libs/gelly/library_methods.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/library_methods.md b/docs/dev/libs/gelly/library_methods.md
index 020f1e7..56ed64e 100644
--- a/docs/dev/libs/gelly/library_methods.md
+++ b/docs/dev/libs/gelly/library_methods.md
@@ -201,12 +201,6 @@ Counts the triangles generated by [Triangle Listing](#triangle-listing).
 The analytic takes an undirected graph as input and returns as a result a `Long` corresponding to the number of triangles
 in the graph. The graph ID type must be `Comparable` and `Copyable`.
 
-## Triangle Listing
-
-This algorithm supports object reuse. The graph ID type must be `Comparable` and `Copyable`.
-
-See the [Triangle Enumerator](#triangle-enumerator) library method for implementation details.
-
 ## Triangle Enumerator
 
 #### Overview
@@ -224,22 +218,6 @@ This implementation extends the basic algorithm by computing output degrees of e
 The algorithm takes a directed graph as input and outputs a `DataSet` of `Tuple3`. The Vertex ID type has to be `Comparable`.
 Each `Tuple3` corresponds to a triangle, with the fields containing the IDs of the vertices forming the triangle.
 
-## Hyperlink-Induced Topic Search
-
-#### Overview
-[Hyperlink-Induced Topic Search](http://www.cs.cornell.edu/home/kleinber/auth.pdf) (HITS, or "Hubs and Authorities")
-computes two interdependent scores for every vertex in a directed graph. Good hubs are those which point to many
-good authorities and good authorities are those pointed to by many good hubs.
-
-#### Details
-Every vertex is assigned the same initial hub and authority scores. The algorithm then iteratively updates the scores
-until termination. During each iteration new hub scores are computed from the authority scores, then new authority
-scores are computed from the new hub scores. The scores are then normalized and optionally tested for convergence.
-
-#### Usage
-The algorithm takes a directed graph as input and outputs a `DataSet` of `Tuple3` containing the vertex ID, hub score,
-and authority score.
-
 ## Summarization
 
 #### Overview
@@ -264,7 +242,171 @@ The algorithm takes a directed, vertex (and possibly edge) attributed graph as i
 vertex represents a group of vertices and each edge represents a group of edges from the input graph. Furthermore, each
 vertex and edge in the output graph stores the common group value and the number of represented elements.
 
-## Adamic-Adar
+## Clustering
+
+### Average Clustering Coefficient
+
+#### Overview
+The average clustering coefficient measures the mean connectedness of a graph. Scores range from 0.0 (no edges between
+neighbors) to 1.0 (complete graph).
+
+#### Details
+See the [Local Clustering Coefficient](#local-clustering-coefficient) library method for a detailed explanation of
+clustering coefficient.
+
+#### Usage
+Directed and undirected variants are provided. The algorithm takes a simple graph as input and outputs a result
+containing the total number of vertices and average local clustering coefficient of the graph. The graph ID type must be
+`Comparable` and `Copyable`.
+
+* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
+
+### Global Clustering Coefficient
+
+#### Overview
+The global clustering coefficient measures the connectedness of a graph. Scores range from 0.0 (no edges between
+neighbors) to 1.0 (complete graph).
+
+#### Details
+See the [Local Clustering Coefficient](#local-clustering-coefficient) library method for a detailed explanation of
+clustering coefficient.
+
+#### Usage
+Directed and undirected variants are provided. The algorithm takes a simple graph as input and outputs a result
+containing the total number of triplets and triangles in the graph. The graph ID type must be `Comparable` and
+`Copyable`.
+
+* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
+
+### Local Clustering Coefficient
+
+#### Overview
+The local clustering coefficient measures the connectedness of each vertex's neighborhood. Scores range from 0.0 (no
+edges between neighbors) to 1.0 (neighborhood is a clique).
+
+#### Details
+An edge between a vertex's neighbors is a triangle. Counting edges between neighbors is equivalent to counting the
+number of triangles which include the vertex. The clustering coefficient score is the number of edges between neighbors
+divided by the number of potential edges between neighbors.
+
+See the [Triangle Enumerator](#triangle-enumerator) library method for a detailed explanation of triangle enumeration.
+
+#### Usage
+Directed and undirected variants are provided. The algorithms take a simple graph as input and outputs a `DataSet` of
+tuples containing the vertex ID, vertex degree, and number of triangles containing the vertex. The graph ID type must be
+`Comparable` and `Copyable`.
+
+* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
+* `setIncludeZeroDegreeVertices`: include results for vertices with a degree of zero
+
+### Triadic Census
+
+#### Overview
+A triad is formed by any three vertices in a graph. Each triad contains three pairs of vertices which may be connected
+or unconnected. The [Triadic Census](http://vlado.fmf.uni-lj.si/pub/networks/doc/triads/triads.pdf) counts the
+occurrences of each type of triad with the graph.
+
+#### Details
+The analytics counts the four undirected triad types (formed with 0, 1, 2, or 3 connecting edges) or 16 directed triad
+types by counting the triangles from [Triangle Listing](#triangle-listing) and running [Vertex Metrics](#vertex-metrics)
+to obtain the number of triplets and edges. Triangle counts are then deducted from triplet counts, and triangle and
+triplet counts are removed from edge counts.
+
+#### Usage
+Directed and undirected variants are provided. The analytics take a simple graph as input and outputs a `Result` with
+accessor methods for the computed statistics. The graph ID type must be `Comparable` and `Copyable`.
+
+* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
+
+### Triangle Listing
+
+#### Overview
+This algorithm supports object reuse. The graph ID type must be `Comparable` and `Copyable`.
+
+#### Details
+See the [Triangle Enumerator](#triangle-enumerator) library method for implementation details.
+
+#### Usage
+Directed and undirected variants are provided. The algorithms take a simple graph as input and outputs a `DataSet` of
+tuples containing the three triangle vertices and, for the directed algorithm, a bitmask marking each of the six
+potential triangle edges. The graph ID type must be `Comparable` and `Copyable`.
+
+* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
+* `setSortTriangleVertices`: normalize the triangle listing such that for each result (K0, K1, K2) the vertex IDs are sorted K0 < K1 < K2
+
+## Link Analysis
+
+### Hyperlink-Induced Topic Search
+
+#### Overview
+[Hyperlink-Induced Topic Search](http://www.cs.cornell.edu/home/kleinber/auth.pdf) (HITS, or "Hubs and Authorities")
+computes two interdependent scores for every vertex in a directed graph. Good hubs are those which point to many
+good authorities and good authorities are those pointed to by many good hubs.
+
+#### Details
+Every vertex is assigned the same initial hub and authority scores. The algorithm then iteratively updates the scores
+until termination. During each iteration new hub scores are computed from the authority scores, then new authority
+scores are computed from the new hub scores. The scores are then normalized and optionally tested for convergence.
+
+#### Usage
+The algorithm takes a directed graph as input and outputs a `DataSet` of `Tuple3` containing the vertex ID, hub score,
+and authority score. Termination is configured with a maximum number of iterations and/or a convergence threshold
+on the sum of the change in each score for each vertex between iterations.
+
+## Metric
+
+### Vertex Metrics
+
+#### Overview
+This graph analytic computes the following statistics for both directed and undirected graphs:
+- number of vertices
+- number of edges
+- average degree
+- number of triplets
+- maximum degree
+- maximum number of triplets
+
+The following statistics are additionally computed for directed graphs:
+- number of unidirectional edges
+- number of bidirectional edges
+- maximum out degree
+- maximum in degree
+
+#### Details
+The statistics are computed over vertex degrees generated from `degree.annotate.directed.VertexDegrees` or
+`degree.annotate.undirected.VertexDegree`.
+
+#### Usage
+Directed and undirected variants are provided. The analytics take a simple graph as input and output a `Result` with
+accessor methods for the computed statistics. The graph ID type must be `Comparable`.
+
+* `setIncludeZeroDegreeVertices`: include results for vertices with a degree of zero
+* `setParallelism`: override the operator parallelism
+* `setReduceOnTargetId` (undirected only): the degree can be counted from either the edge source or target IDs. By default the source IDs are counted. Reducing on target IDs may optimize the algorithm if the input edge list is sorted by target ID
+
+### Edge Metrics
+
+#### Overview
+This graph analytic computes the following statistics:
+- number of triangle triplets
+- number of rectangle triplets
+- maximum number of triangle triplets
+- maximum number of rectangle triplets
+
+#### Details
+The statistics are computed over edge degrees generated from `degree.annotate.directed.EdgeDegreesPair` or
+`degree.annotate.undirected.EdgeDegreePair` and grouped by vertex.
+
+#### Usage
+Directed and undirected variants are provided. The analytics take a simple graph as input and output a `Result` with
+accessor methods for the computed statistics. The graph ID type must be `Comparable`.
+
+* `setParallelism`: override the operator parallelism
+* `setReduceOnTargetId` (undirected only): the degree can be counted from either the edge source or target IDs. By default the source IDs are counted. Reducing on target IDs may optimize the algorithm if the input edge list is sorted by target ID
+
+## Similarity
+
+### Adamic-Adar
 
 #### Overview
 Adamic-Adar measures the similarity between pairs of vertices as the sum of the inverse logarithm of degree over shared
@@ -286,7 +428,7 @@ the Adamic-Adair similarity score. The graph ID type must be `Comparable` and `C
 * `setMinimumRatio`: filter out Adamic-Adar scores less than the given ratio times the average score
 * `setMinimumScore`: filter out Adamic-Adar scores less than the given minimum
 
-## Jaccard Index
+### Jaccard Index
 
 #### Overview
 The Jaccard Index measures the similarity between vertex neighborhoods and is computed as the number of shared neighbors
@@ -310,38 +452,4 @@ Jaccard Index score. The graph ID type must be `Comparable` and `Copyable`.
 * `setMaximumScore`: filter out Jaccard Index scores greater than or equal to the given maximum fraction
 * `setMinimumScore`: filter out Jaccard Index scores less than the given minimum fraction
 
-## Local Clustering Coefficient
-
-#### Overview
-The local clustering coefficient measures the connectedness of each vertex's neighborhood. Scores range from 0.0 (no
-edges between neighbors) to 1.0 (neighborhood is a clique).
-
-#### Details
-An edge between a vertex's neighbors is a triangle. Counting edges between neighbors is equivalent to counting the
-number of triangles which include the vertex. The clustering coefficient score is the number of edges between neighbors
-divided by the number of potential edges between neighbors.
-
-See the [Triangle Enumeration](#triangle-enumeration) library method for a detailed explanation of triangle enumeration.
-
-#### Usage
-Directed and undirected variants are provided. The algorithms take a simple graph as input and output a `DataSet` of
-tuples containing the vertex ID, vertex degree, and number of triangles containing the vertex. The graph ID type must be
-`Comparable` and `Copyable`.
-
-## Global Clustering Coefficient
-
-#### Overview
-The global clustering coefficient measures the connectedness of a graph. Scores range from 0.0 (no edges between
-neighbors) to 1.0 (complete graph).
-
-#### Details
-See the [Local Clustering Coefficient](#local-clustering-coefficient) library method for a detailed explanation of
-clustering coefficient.
-
-#### Usage
-Directed and undirected variants are provided. The algorithm takes a simple graph as input and outputs a result
-containing the total number of triplets and triangles in the graph. The graph ID type must be `Comparable` and
-`Copyable`.
-
-
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/86a3dd58/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
index 92f6a2c..818b0d8 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
@@ -30,8 +30,8 @@ import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.GraphCsvReader;
-import org.apache.flink.graph.asm.simple.undirected.Simplify;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
 import org.apache.flink.graph.generator.RMatGraph;
@@ -62,6 +62,8 @@ public class TriangleListing {
 
 	private static final int DEFAULT_EDGE_FACTOR = 16;
 
+	private static final boolean DEFAULT_TRIADIC_CENSUS = true;
+
 	private static final boolean DEFAULT_CLIP_AND_FLIP = true;
 
 	private static String getUsage(String message) {
@@ -72,7 +74,7 @@ public class TriangleListing {
 			.appendln(WordUtils.wrap("This algorithm returns tuples containing the vertex IDs for each triangle and" +
 				" for directed graphs a bitmask indicating the presence of the six potential connecting edges.", 80))
 			.appendNewLine()
-			.appendln("usage: TriangleListing --directed <true | false> --input <csv | rmat [options]> --output <print | hash | csv [options]>")
+			.appendln("usage: TriangleListing --directed <true | false> [--triadic_census <true | false>] --input <csv | rmat> --output <print | hash | csv>")
 			.appendNewLine()
 			.appendln("options:")
 			.appendln("  --input csv --type <integer | string> [--simplify <true | false>] --input_filename FILENAME [--input_line_delimiter LINE_DELIMITER] [--input_field_delimiter FIELD_DELIMITER]")
@@ -100,7 +102,9 @@ public class TriangleListing {
 		boolean directedAlgorithm = parameters.getBoolean("directed");
 
 		int little_parallelism = parameters.getInt("little_parallelism", PARALLELISM_DEFAULT);
+		boolean triadic_census = parameters.getBoolean("triadic_census", DEFAULT_TRIADIC_CENSUS);
 
+		GraphAnalytic tc = null;
 		DataSet tl;
 
 		switch (parameters.get("input", "")) {
@@ -129,6 +133,11 @@ public class TriangleListing {
 										.setParallelism(little_parallelism));
 							}
 
+							if (triadic_census) {
+								tc = graph
+									.run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<LongValue, NullValue, NullValue>()
+										.setLittleParallelism(little_parallelism));
+							}
 							tl = graph
 								.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
@@ -139,6 +148,11 @@ public class TriangleListing {
 										.setParallelism(little_parallelism));
 							}
 
+							if (triadic_census) {
+								tc = graph
+									.run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<LongValue, NullValue, NullValue>()
+										.setLittleParallelism(little_parallelism));
+							}
 							tl = graph
 								.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
@@ -156,6 +170,11 @@ public class TriangleListing {
 										.setParallelism(little_parallelism));
 							}
 
+							if (triadic_census) {
+								tc = graph
+									.run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<StringValue, NullValue, NullValue>()
+										.setLittleParallelism(little_parallelism));
+							}
 							tl = graph
 								.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<StringValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
@@ -166,6 +185,11 @@ public class TriangleListing {
 										.setParallelism(little_parallelism));
 							}
 
+							if (triadic_census) {
+								tc = graph
+									.run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<StringValue, NullValue, NullValue>()
+										.setLittleParallelism(little_parallelism));
+							}
 							tl = graph
 								.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<StringValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
@@ -193,38 +217,61 @@ public class TriangleListing {
 
 				if (directedAlgorithm) {
 					if (scale > 32) {
-						tl = graph
+						Graph<LongValue, NullValue, NullValue> simpleGraph = graph
 							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
-								.setParallelism(little_parallelism))
+								.setParallelism(little_parallelism));
+
+						if (triadic_census) {
+							tc = simpleGraph
+								.run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+						}
+						tl = simpleGraph
 							.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()
 								.setLittleParallelism(little_parallelism));
 					} else {
-						tl = graph
-							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
-								.setParallelism(little_parallelism))
-							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<IntValue, NullValue, NullValue>()
-								.setParallelism(little_parallelism))
-							.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<IntValue, NullValue, NullValue>()
+						Graph<LongValue, NullValue, NullValue> simpleGraph = graph
+							.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>()
+								.setParallelism(little_parallelism));
+
+						if (triadic_census) {
+							tc = simpleGraph
+								.run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+						}
+						tl = simpleGraph
+							.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<LongValue, NullValue, NullValue>()
 								.setLittleParallelism(little_parallelism));
 					}
 				} else {
 					boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
 
-					graph = graph
-						.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
-
 					if (scale > 32) {
-						tl = graph
+						Graph<LongValue, NullValue, NullValue> simpleGraph = graph
 							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(clipAndFlip)
-								.setParallelism(little_parallelism))
+								.setParallelism(little_parallelism));
+
+						if (triadic_census) {
+							tc = simpleGraph
+								.run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+						}
+						tl = simpleGraph
 							.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<LongValue, NullValue, NullValue>()
 								.setLittleParallelism(little_parallelism));
 					} else {
-						tl = graph
+						Graph<IntValue, NullValue, NullValue> simpleGraph = graph
 							.run(new TranslateGraphIds<LongValue, IntValue, NullValue, NullValue>(new LongValueToUnsignedIntValue())
 								.setParallelism(little_parallelism))
 							.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<IntValue, NullValue, NullValue>(clipAndFlip)
-								.setParallelism(little_parallelism))
+								.setParallelism(little_parallelism));
+
+						if (triadic_census) {
+							tc = simpleGraph
+								.run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<IntValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
+						}
+						tl = simpleGraph
 							.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<IntValue, NullValue, NullValue>()
 								.setLittleParallelism(little_parallelism));
 					}
@@ -269,6 +316,11 @@ public class TriangleListing {
 				throw new ProgramParametrizationException(getUsage("invalid output type"));
 		}
 
+		if (tc != null) {
+			System.out.print("Triadic census:\n  ");
+			System.out.println(tc.getResult().toString().replace(";", "\n "));
+		}
+
 		JobExecutionResult result = env.getLastJobExecutionResult();
 
 		NumberFormat nf = NumberFormat.getInstance();

http://git-wip-us.apache.org/repos/asf/flink/blob/86a3dd58/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
new file mode 100644
index 0000000..6c9e7b6
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
@@ -0,0 +1,562 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.directed;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.AnalyticHelper;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.library.clustering.directed.TriadicCensus.Result;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.Preconditions;
+
+import java.io.IOException;
+import java.math.BigInteger;
+import java.text.NumberFormat;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * A triad is formed by three connected or unconnected vertices in a graph.
+ * The triadic census counts the occurrences of each type of triad.
+ * <br/>
+ * http://vlado.fmf.uni-lj.si/pub/networks/doc/triads/triads.pdf
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class TriadicCensus<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+	private TriangleListingHelper<K> triangleListingHelper;
+
+	private VertexDegreesHelper<K> vertexDegreesHelper;
+
+	// Optional configuration
+	private int littleParallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Override the parallelism of operators processing small amounts of data.
+	 *
+	 * @param littleParallelism operator parallelism
+	 * @return this
+	 */
+	public TriadicCensus<K, VV, EV> setLittleParallelism(int littleParallelism) {
+		this.littleParallelism = littleParallelism;
+
+		return this;
+	}
+
+	@Override
+	public TriadicCensus<K, VV, EV> run(Graph<K, VV, EV> input)
+			throws Exception {
+		super.run(input);
+
+		triangleListingHelper = new TriangleListingHelper<>();
+
+		input
+			.run(new TriangleListing<K, VV, EV>()
+				.setLittleParallelism(littleParallelism))
+			.output(triangleListingHelper)
+				.name("Triangle counts");
+
+		vertexDegreesHelper = new VertexDegreesHelper<>();
+
+		input
+			.run(new VertexDegrees<K, VV, EV>()
+				.setParallelism(littleParallelism))
+			.output(vertexDegreesHelper)
+				.name("Edge and triplet counts");
+
+		return this;
+	}
+
+	@Override
+	public Result getResult() {
+		BigInteger one = BigInteger.ONE;
+		BigInteger two = BigInteger.valueOf(2);
+		BigInteger three = BigInteger.valueOf(3);
+		BigInteger six = BigInteger.valueOf(6);
+
+		BigInteger vertexCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "vc"));
+		BigInteger unidirectionalEdgeCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "uec") / 2);
+		BigInteger bidirectionalEdgeCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "bec") / 2);
+		BigInteger triplet021dCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "021d"));
+		BigInteger triplet021uCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "021u"));
+		BigInteger triplet021cCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "021c"));
+		BigInteger triplet111dCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "111d"));
+		BigInteger triplet111uCount = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "111u"));
+		BigInteger triplet201Count = BigInteger.valueOf((Long)vertexDegreesHelper.getAccumulator(env, "201"));
+
+		// triads with three connecting edges = closed triplet = triangle
+		BigInteger triangle030tCount = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "030t"));
+		BigInteger triangle030cCount = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "030c"));
+		BigInteger triangle120dCount = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "120d"));
+		BigInteger triangle120uCount = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "120u"));
+		BigInteger triangle120cCount = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "120c"));
+		BigInteger triangle210Count = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "210"));
+		BigInteger triangle300Count = BigInteger.valueOf((Long)triangleListingHelper.getAccumulator(env, "300"));
+
+		// triads with two connecting edges = open triplet;
+		// each triangle deducts the count of three triplets
+		triplet201Count = triplet201Count.subtract(triangle300Count.multiply(three));
+
+		triplet201Count = triplet201Count.subtract(triangle210Count);
+		triplet111dCount = triplet111dCount.subtract(triangle210Count);
+		triplet111uCount = triplet111uCount.subtract(triangle210Count);
+
+		triplet111dCount = triplet111dCount.subtract(triangle120cCount);
+		triplet111uCount = triplet111uCount.subtract(triangle120cCount);
+		triplet021cCount = triplet021cCount.subtract(triangle120cCount);
+
+		triplet111uCount = triplet111uCount.subtract(triangle120uCount.multiply(two));
+		triplet021uCount = triplet021uCount.subtract(triangle120uCount);
+
+		triplet111dCount = triplet111dCount.subtract(triangle120dCount.multiply(two));
+		triplet021dCount = triplet021dCount.subtract(triangle120dCount);
+
+		triplet021cCount = triplet021cCount.subtract(triangle030cCount.multiply(three));
+
+		triplet021cCount = triplet021cCount.subtract(triangle030tCount);
+		triplet021uCount = triplet021uCount.subtract(triangle030tCount);
+		triplet021dCount = triplet021dCount.subtract(triangle030tCount);
+
+		// triads with one connecting edge; each edge pairs with `vertex count - 2` vertices;
+		// each triangle deducts from three and each open triplet from two edges
+		BigInteger edge102 = bidirectionalEdgeCount
+			.multiply(vertexCount.subtract(two))
+			.subtract(triplet111dCount)
+			.subtract(triplet111uCount)
+			.subtract(triplet201Count.multiply(two))
+			.subtract(triangle120dCount)
+			.subtract(triangle120uCount)
+			.subtract(triangle120cCount)
+			.subtract(triangle210Count.multiply(two))
+			.subtract(triangle300Count.multiply(three));
+
+		BigInteger edge012 = unidirectionalEdgeCount
+			.multiply(vertexCount.subtract(two))
+			.subtract(triplet021dCount.multiply(two))
+			.subtract(triplet021uCount.multiply(two))
+			.subtract(triplet021cCount.multiply(two))
+			.subtract(triplet111dCount)
+			.subtract(triplet111uCount)
+			.subtract(triangle030tCount.multiply(three))
+			.subtract(triangle030cCount.multiply(three))
+			.subtract(triangle120dCount.multiply(two))
+			.subtract(triangle120uCount.multiply(two))
+			.subtract(triangle120cCount.multiply(two))
+			.subtract(triangle210Count);
+
+		// triads with zero connecting edges;
+		// (vertex count choose 3) minus earlier counts
+		BigInteger triad003 = vertexCount
+			.multiply(vertexCount.subtract(one))
+			.multiply(vertexCount.subtract(two))
+			.divide(six)
+			.subtract(edge012)
+			.subtract(edge102)
+			.subtract(triplet021dCount)
+			.subtract(triplet021uCount)
+			.subtract(triplet021cCount)
+			.subtract(triplet111dCount)
+			.subtract(triplet111uCount)
+			.subtract(triangle030tCount)
+			.subtract(triangle030cCount)
+			.subtract(triplet201Count)
+			.subtract(triangle120dCount)
+			.subtract(triangle120uCount)
+			.subtract(triangle120cCount)
+			.subtract(triangle210Count)
+			.subtract(triangle300Count);
+
+		return new Result(triad003, edge012, edge102, triplet021dCount,
+			triplet021uCount, triplet021cCount, triplet111dCount, triplet111uCount,
+			triangle030tCount, triangle030cCount, triplet201Count, triangle120dCount,
+			triangle120uCount, triangle120cCount, triangle210Count, triangle300Count);
+	}
+
+	/**
+	 * Helper class to collect triadic census metrics from the triangle listing.
+	 *
+	 * @param <T> ID type
+	 */
+	private static class TriangleListingHelper<T>
+	extends AnalyticHelper<TriangleListing.Result<T>> {
+		private long[] triangleCount = new long[64];
+
+		@Override
+		public void writeRecord(TriangleListing.Result<T> record) throws IOException {
+			triangleCount[record.f3.getValue()]++;
+		}
+
+		@Override
+		public void close() throws IOException {
+			// see table from Batagelj and Mrvar, "A subquadratic triad census algorithm for large
+			// sparse networks with small maximum degree" (this Flink algorithm does not use their
+			// algorithm as we do not assume a small maximum degree)
+			int[] typeTable = new int[]{
+				1, 2, 2, 3, 2, 4, 6, 8,
+				2, 6, 5, 7, 3, 8, 7, 11,
+				2, 6, 4, 8, 5, 9, 9, 13,
+				6, 10, 9, 14, 7, 14, 12, 15,
+				2, 5, 6, 7, 6, 9, 10, 14,
+				4, 9, 9, 12, 8, 13, 14, 15,
+				3, 7, 8, 11, 7, 12, 14, 15,
+				8, 14, 13, 15, 11, 15, 15, 16};
+
+			long triangle030tCount = 0;
+			long triangle030cCount = 0;
+			long triangle120dCount = 0;
+			long triangle120uCount = 0;
+			long triangle120cCount = 0;
+			long triangle210Count = 0;
+			long triangle300tCount = 0;
+
+			for (int i = 0 ; i < typeTable.length ; i++) {
+				if (typeTable[i] == 9) {
+					triangle030tCount += triangleCount[i];
+				} else if (typeTable[i] == 10) {
+					triangle030cCount += triangleCount[i];
+				} else if (typeTable[i] == 12) {
+					triangle120dCount += triangleCount[i];
+				} else if (typeTable[i] == 13) {
+					triangle120uCount += triangleCount[i];
+				} else if (typeTable[i] == 14) {
+					triangle120cCount += triangleCount[i];
+				} else if (typeTable[i] == 15) {
+					triangle210Count += triangleCount[i];
+				} else if (typeTable[i] == 16) {
+					triangle300tCount += triangleCount[i];
+				} else {
+					assert triangleCount[i] == 0;
+				}
+			}
+
+			addAccumulator("030t", new LongCounter(triangle030tCount));
+			addAccumulator("030c", new LongCounter(triangle030cCount));
+			addAccumulator("120d", new LongCounter(triangle120dCount));
+			addAccumulator("120u", new LongCounter(triangle120uCount));
+			addAccumulator("120c", new LongCounter(triangle120cCount));
+			addAccumulator("210", new LongCounter(triangle210Count));
+			addAccumulator("300", new LongCounter(triangle300tCount));
+		}
+	}
+
+	/**
+	 * Helper class to collect triadic census metrics from vertex degrees.
+	 *
+	 * @param <T> ID type
+	 */
+	private static class VertexDegreesHelper<T>
+	extends AnalyticHelper<Vertex<T, Degrees>> {
+		private long vertexCount;
+		private long unidirectionalEdgeCount;
+		private long bidirectionalEdgeCount;
+		private long triplet021dCount;
+		private long triplet021uCount;
+		private long triplet021cCount;
+		private long triplet111dCount;
+		private long triplet111uCount;
+		private long triplet201Count;
+
+		@Override
+		public void writeRecord(Vertex<T, Degrees> record) throws IOException {
+			long degree = record.f1.getDegree().getValue();
+			long outDegree = record.f1.getOutDegree().getValue();
+			long inDegree = record.f1.getInDegree().getValue();
+
+			long unidirectionalEdgesAsSource = degree - inDegree;
+			long unidirectionalEdgesAsTarget = degree - outDegree;
+			long bidirectionalEdges = inDegree + outDegree - degree;
+
+			vertexCount++;
+			unidirectionalEdgeCount += unidirectionalEdgesAsSource + unidirectionalEdgesAsTarget;
+			bidirectionalEdgeCount += bidirectionalEdges;
+
+			triplet021dCount += unidirectionalEdgesAsSource * (unidirectionalEdgesAsSource - 1) / 2;
+			triplet021uCount += unidirectionalEdgesAsTarget * (unidirectionalEdgesAsTarget - 1) / 2;
+			triplet021cCount += unidirectionalEdgesAsSource * unidirectionalEdgesAsTarget;
+			triplet111dCount += unidirectionalEdgesAsTarget * bidirectionalEdges;
+			triplet111uCount += unidirectionalEdgesAsSource * bidirectionalEdges;
+			triplet201Count += bidirectionalEdges * (bidirectionalEdges - 1) / 2;
+		}
+
+		@Override
+		public void close() throws IOException {
+			addAccumulator("vc", new LongCounter(vertexCount));
+			addAccumulator("uec", new LongCounter(unidirectionalEdgeCount));
+			addAccumulator("bec", new LongCounter(bidirectionalEdgeCount));
+			addAccumulator("021d", new LongCounter(triplet021dCount));
+			addAccumulator("021u", new LongCounter(triplet021uCount));
+			addAccumulator("021c", new LongCounter(triplet021cCount));
+			addAccumulator("111d", new LongCounter(triplet111dCount));
+			addAccumulator("111u", new LongCounter(triplet111uCount));
+			addAccumulator("201", new LongCounter(triplet201Count));
+		}
+	}
+
+	/**
+	 * Wraps triadic census metrics.
+	 */
+	public static class Result {
+		private final BigInteger[] counts;
+
+		public Result(BigInteger... counts) {
+			Preconditions.checkArgument(counts.length == 16,
+				"Expected 16 counts but received " + counts.length);
+
+			this.counts = counts;
+		}
+
+		public Result(long... counts) {
+			Preconditions.checkArgument(counts.length == 16,
+				"Expected 16 counts but received " + counts.length);
+
+			this.counts = new BigInteger[counts.length];
+
+			for (int i = 0; i < counts.length; i++) {
+				this.counts[i] = BigInteger.valueOf(counts[i]);
+			}
+		}
+
+		/**
+		 * Get the count of "003" triads which have zero connecting vertices.
+		 *
+		 * @return count of "003" triads
+		 */
+		public BigInteger getCount003() {
+			return counts[0];
+		}
+
+		/**
+		 * Get the count of "012" triads which have one unidirectional edge among the vertices.
+		 *
+		 * @return count of "012" triads
+		 */
+		public BigInteger getCount012() {
+			return counts[1];
+		}
+
+		/**
+		 * Get the count of "102" triads which have one bidirectional edge among the vertices.
+		 *
+		 * @return count of "102" triads
+		 */
+		public BigInteger getCount102() {
+			return counts[2];
+		}
+
+		/**
+		 * Get the count of "021d" triads which have two unidirectional edges among the vertices,
+		 * forming an open triplet; both edges source the center vertex.
+		 *
+		 * @return count of "021d" triads
+		 */
+		public BigInteger getCount021d() {
+			return counts[3];
+		}
+
+		/**
+		 * Get the count of "021u" triads which have two unidirectional edges among the vertices,
+		 * forming an open triplet; both edges target the center vertex.
+		 *
+		 * @return count of "021u" triads
+		 */
+		public BigInteger getCount021u() {
+			return counts[4];
+		}
+
+		/**
+		 * Get the count of "021c" triads which have two unidirectional edges among the vertices,
+		 * forming an open triplet; one edge sources and one edge targets the center vertex.
+		 *
+		 * @return count of "021c" triads
+		 */
+		public BigInteger getCount021c() {
+			return counts[5];
+		}
+
+		/**
+		 * Get the count of "111d" triads which have one unidirectional and one bidirectional edge
+		 * among the vertices, forming an open triplet; the unidirectional edge targets the center vertex.
+		 *
+		 * @return count of "111d" triads
+		 */
+		public BigInteger getCount111d() {
+			return counts[6];
+		}
+
+		/**
+		 * Get the count of "111u" triads which have one unidirectional and one bidirectional edge
+		 * among the vertices, forming an open triplet; the unidirectional edge sources the center vertex.
+		 *
+		 * @return count of "111u" triads
+		 */
+		public BigInteger getCount111u() {
+			return counts[7];
+		}
+
+		/**
+		 * Get the count of "030t" triads which have three unidirectional edges among the vertices,
+		 * forming a closed triplet, a triangle; two of the unidirectional edges source/target the
+		 * same vertex.
+		 *
+		 * @return count of "030t" triads
+		 */
+		public BigInteger getCount030t() {
+			return counts[8];
+		}
+
+		/**
+		 * Get the count of "030c" triads which have three unidirectional edges among the vertices,
+		 * forming a closed triplet, a triangle; the three unidirectional edges both source and target
+		 * different vertices.
+		 *
+		 * @return count of "030c" triads
+		 */
+		public BigInteger getCount030c() {
+			return counts[9];
+		}
+
+		/**
+		 * Get the count of "201" triads which have two unidirectional edges among the vertices,
+		 * forming an open triplet.
+		 *
+		 * @return count of "201" triads
+		 */
+		public BigInteger getCount201() {
+			return counts[10];
+		}
+
+		/**
+		 * Get the count of "120d" triads which have two unidirectional edges and one bidirectional edge
+		 * among the vertices, forming a closed triplet, a triangle; both unidirectional edges source
+		 * the same vertex.
+		 *
+		 * @return count of "120d" triads
+		 */
+		public BigInteger getCount120d() {
+			return counts[11];
+		}
+
+		/**
+		 * Get the count of "120u" triads which have two unidirectional and one bidirectional edges
+		 * among the vertices, forming a closed triplet, a triangle; both unidirectional edges target
+		 * the same vertex.
+		 *
+		 * @return count of "120u" triads
+		 */
+		public BigInteger getCount120u() {
+			return counts[12];
+		}
+
+		/**
+		 * Get the count of "120c" triads which have two unidirectional edges and one bidirectional edge
+		 * among the vertices, forming a closed triplet, a triangle; one vertex is sourced by and targeted
+		 * by the unidirectional edges.
+		 *
+		 * @return count of "120c" triads
+		 */
+		public BigInteger getCount120c() {
+			return counts[13];
+		}
+
+		/**
+		 * Get the count of "210" triads which have one unidirectional edge and two bidirectional edges
+		 * among the vertices, forming a closed triplet, a triangle.
+		 *
+		 * @return count of "210" triads
+		 */
+		public BigInteger getCount210() {
+			return counts[14];
+		}
+
+		/**
+		 * Get the count of "300" triads which have three bidirectional edges among the vertices,
+		 * forming a closed triplet, a triangle.
+		 *
+		 * @return count of "300" triads
+		 */
+		public BigInteger getCount300() {
+			return counts[15];
+		}
+
+		/**
+		 * Get the array of counts.
+		 *
+		 * The order of the counts is from least to most connected:
+		 *   003, 012, 102, 021d, 021u, 021c, 111d, 111u,
+		 *   030t, 030c, 201, 120d, 120u, 120c, 210, 300
+		 *
+		 * @return array of counts
+		 */
+		public BigInteger[] getCounts() {
+			return counts;
+		}
+
+		@Override
+		public String toString() {
+			NumberFormat nf = NumberFormat.getInstance();
+
+			return "003: " + nf.format(getCount003())
+				+ "; 012: " + nf.format(getCount012())
+				+ "; 102: " + nf.format(getCount102())
+				+ "; 021d: " + nf.format(getCount021d())
+				+ "; 021u: " + nf.format(getCount021u())
+				+ "; 021c: " + nf.format(getCount021c())
+				+ "; 111d: " + nf.format(getCount111d())
+				+ "; 111u: " + nf.format(getCount111u())
+				+ "; 030t: " + nf.format(getCount030t())
+				+ "; 030c: " + nf.format(getCount030c())
+				+ "; 201: " + nf.format(getCount201())
+				+ "; 120d: " + nf.format(getCount120d())
+				+ "; 120u: " + nf.format(getCount120u())
+				+ "; 120c: " + nf.format(getCount120c())
+				+ "; 210: " + nf.format(getCount210())
+				+ "; 300: " + nf.format(getCount300());
+		}
+
+		@Override
+		public int hashCode() {
+			return new HashCodeBuilder()
+				.append(counts)
+				.hashCode();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == null) { return false; }
+			if (obj == this) { return true; }
+			if (obj.getClass() != getClass()) { return false; }
+
+			Result rhs = (Result)obj;
+
+			return new EqualsBuilder()
+				.append(counts, rhs.counts)
+				.isEquals();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86a3dd58/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
new file mode 100644
index 0000000..f057803
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
@@ -0,0 +1,237 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.library.clustering.undirected.TriadicCensus.Result;
+import org.apache.flink.graph.library.metric.undirected.VertexMetrics;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.Preconditions;
+
+import java.math.BigInteger;
+import java.text.NumberFormat;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * A triad is formed by three connected or unconnected vertices in a graph.
+ * The triadic census counts the occurrences of each type of triad.
+ * <br/>
+ * The four types of undirected triads are formed with 0, 1, 2, or 3
+ * connecting edges.
+ * <br/>
+ * http://vlado.fmf.uni-lj.si/pub/networks/doc/triads/triads.pdf
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class TriadicCensus<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+	private TriangleCount<K, VV, EV> triangleCount;
+
+	private VertexMetrics<K, VV, EV> vertexMetrics;
+
+	// Optional configuration
+	private int littleParallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Override the parallelism of operators processing small amounts of data.
+	 *
+	 * @param littleParallelism operator parallelism
+	 * @return this
+	 */
+	public TriadicCensus<K, VV, EV> setLittleParallelism(int littleParallelism) {
+		this.littleParallelism = littleParallelism;
+
+		return this;
+	}
+
+	@Override
+	public TriadicCensus<K, VV, EV> run(Graph<K, VV, EV> input)
+			throws Exception {
+		super.run(input);
+
+		triangleCount = new TriangleCount<K, VV, EV>()
+			.setLittleParallelism(littleParallelism);
+
+		input.run(triangleCount);
+
+		vertexMetrics = new VertexMetrics<K, VV, EV>()
+			.setParallelism(littleParallelism);
+
+		input.run(vertexMetrics);
+
+		return this;
+	}
+
+	@Override
+	public Result getResult() {
+		// vertex metrics
+		BigInteger bigVertexCount = BigInteger.valueOf(vertexMetrics.getResult().getNumberOfVertices());
+		BigInteger bigEdgeCount = BigInteger.valueOf(vertexMetrics.getResult().getNumberOfEdges());
+		BigInteger bigTripletCount = BigInteger.valueOf(vertexMetrics.getResult().getNumberOfTriplets());
+
+		// triangle count
+		BigInteger bigTriangleCount = BigInteger.valueOf(triangleCount.getResult());
+
+		BigInteger one = BigInteger.ONE;
+		BigInteger two = BigInteger.valueOf(2);
+		BigInteger three = BigInteger.valueOf(3);
+		BigInteger six = BigInteger.valueOf(6);
+
+		// counts as ordered in TriadicCensus.Result
+		BigInteger[] counts = new BigInteger[4];
+
+		// triads with three connecting edges = closed triplet = triangle
+		counts[3] = bigTriangleCount;
+
+		// triads with two connecting edges = open triplet;
+		// deduct each triplet having been counted three times per triangle
+		counts[2] = bigTripletCount.subtract(bigTriangleCount.multiply(three));
+
+		// triads with one connecting edge; each edge pairs with `vertex count - 2` vertices
+		// then deduct twice for each open triplet and three times for each triangle
+		counts[1] = bigEdgeCount
+			.multiply(bigVertexCount.subtract(two))
+			.subtract(counts[2].multiply(two))
+			.subtract(counts[3].multiply(three));
+
+		// triads with zero connecting edges;
+		// (vertex count choose 3) minus earlier counts
+		counts[0] = bigVertexCount
+			.multiply(bigVertexCount.subtract(one))
+			.multiply(bigVertexCount.subtract(two))
+			.divide(six)
+			.subtract(counts[1])
+			.subtract(counts[2])
+			.subtract(counts[3]);
+
+		return new Result(counts);
+	}
+
+	/**
+	 * Wraps triadic census metrics.
+	 */
+	public static class Result {
+		private final BigInteger[] counts;
+
+		public Result(BigInteger... counts) {
+			Preconditions.checkArgument(counts.length == 4,
+				"Expected 4 counts but received " + counts.length);
+
+			this.counts = counts;
+		}
+
+		public Result(long... counts) {
+			Preconditions.checkArgument(counts.length == 4,
+				"Expected 4 counts but received " + counts.length);
+
+			this.counts = new BigInteger[counts.length];
+
+			for (int i = 0; i < counts.length; i++) {
+				this.counts[i] = BigInteger.valueOf(counts[i]);
+			}
+		}
+
+		/**
+		 * Get the count of "03" triads which have zero connecting vertices.
+		 *
+		 * @return count of "03" triads
+		 */
+		public BigInteger getCount03() {
+			return counts[0];
+		}
+
+		/**
+		 * Get the count of "12" triads which have one edge among the vertices.
+		 *
+		 * @return count of "12" triads
+		 */
+		public BigInteger getCount12() {
+			return counts[1];
+		}
+
+		/**
+		 * Get the count of "21" triads which have two edges among the vertices
+		 * and form a open triplet.
+		 *
+		 * @return count of "21" triads
+		 */
+		public BigInteger getCount21() {
+			return counts[2];
+		}
+
+		/**
+		 * Get the count of "30" triads which have three edges among the vertices
+		 * and form a closed triplet, a triangle.
+		 *
+		 * @return count of "30" triads
+		 */
+		public BigInteger getCount30() {
+			return counts[3];
+		}
+
+		/**
+		 * Get the array of counts.
+		 *
+		 * The order of the counts is from least to most connected:
+		 *   03, 12, 21, 30
+		 *
+		 * @return array of counts
+		 */
+		public BigInteger[] getCounts() {
+			return counts;
+		}
+
+		@Override
+		public String toString() {
+			NumberFormat nf = NumberFormat.getInstance();
+
+			return "03: " + nf.format(getCount03())
+				+ "; 12: " + nf.format(getCount12())
+				+ "; 21: " + nf.format(getCount21())
+				+ "; 30: " + nf.format(getCount30());
+		}
+
+		@Override
+		public int hashCode() {
+			return new HashCodeBuilder()
+				.append(counts)
+				.hashCode();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == null) { return false; }
+			if (obj == this) { return true; }
+			if (obj.getClass() != getClass()) { return false; }
+
+			Result rhs = (Result)obj;
+
+			return new EqualsBuilder()
+				.append(counts, rhs.counts)
+				.isEquals();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86a3dd58/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
index 07f4eed..fba72ed 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -32,13 +32,12 @@ import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.AnalyticHelper;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.AnalyticHelper;
 import org.apache.flink.graph.asm.degree.annotate.directed.EdgeDegreesPair;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import org.apache.flink.graph.library.metric.directed.EdgeMetrics.Result;
-import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
 
@@ -58,7 +57,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <VV> vertex value type
  * @param <EV> edge value type
  */
-public class EdgeMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+public class EdgeMetrics<K extends Comparable<K>, VV, EV>
 extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 	private static final String TRIANGLE_TRIPLET_COUNT = "triangleTripletCount";

http://git-wip-us.apache.org/repos/asf/flink/blob/86a3dd58/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
index 1285bb9..c90423b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
@@ -24,13 +24,12 @@ import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.accumulators.LongMaximum;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.AbstractGraphAnalytic;
-import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.AnalyticHelper;
+import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 import org.apache.flink.graph.library.metric.directed.VertexMetrics.Result;
-import org.apache.flink.types.CopyableValue;
 
 import java.io.IOException;
 import java.text.NumberFormat;
@@ -54,7 +53,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <VV> vertex value type
  * @param <EV> edge value type
  */
-public class VertexMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+public class VertexMetrics<K extends Comparable<K>, VV, EV>
 extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 	private static final String VERTEX_COUNT = "vertexCount";

http://git-wip-us.apache.org/repos/asf/flink/blob/86a3dd58/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
index a4deeaf..af4a57f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -29,12 +29,11 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.AnalyticHelper;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.AnalyticHelper;
 import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair;
 import org.apache.flink.graph.library.metric.undirected.EdgeMetrics.Result;
-import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 
 import java.io.IOException;
@@ -53,7 +52,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <VV> vertex value type
  * @param <EV> edge value type
  */
-public class EdgeMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+public class EdgeMetrics<K extends Comparable<K>, VV, EV>
 extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 	private static final String TRIANGLE_TRIPLET_COUNT = "triangleTripletCount";

http://git-wip-us.apache.org/repos/asf/flink/blob/86a3dd58/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
index ee67129..5af7cf6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -24,12 +24,11 @@ import org.apache.flink.api.common.accumulators.LongCounter;
 import org.apache.flink.api.common.accumulators.LongMaximum;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.AbstractGraphAnalytic;
-import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.AnalyticHelper;
+import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
 import org.apache.flink.graph.library.metric.undirected.VertexMetrics.Result;
-import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 
 import java.io.IOException;
@@ -50,7 +49,7 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
  * @param <VV> vertex value type
  * @param <EV> edge value type
  */
-public class VertexMetrics<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+public class VertexMetrics<K extends Comparable<K>, VV, EV>
 extends AbstractGraphAnalytic<K, VV, EV, Result> {
 
 	private static final String VERTEX_COUNT = "vertexCount";

http://git-wip-us.apache.org/repos/asf/flink/blob/86a3dd58/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriadicCensusTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriadicCensusTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriadicCensusTest.java
new file mode 100644
index 0000000..965f602
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/TriadicCensusTest.java
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.directed;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.clustering.directed.TriadicCensus.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TriadicCensusTest
+extends AsmTestBase {
+
+	@Test
+	public void testWithUndirectedSimpleGraph()
+			throws Exception {
+		Result expectedResult = new Result(3, 0, 8, 0, 0, 0, 0, 0, 0, 0, 7, 0, 0, 0, 0, 2);
+
+		Result triadCensus = new TriadicCensus<IntValue, NullValue, NullValue>()
+			.run(undirectedSimpleGraph)
+			.execute();
+
+		assertEquals(expectedResult, triadCensus);
+	}
+
+	@Test
+	public void testWithDirectedSimpleGraph()
+			throws Exception {
+		Result expectedResult = new Result(3, 8, 0, 1, 2, 4, 0, 0, 2, 0, 0, 0, 0, 0, 0, 0);
+
+		Result triadCensus = new TriadicCensus<IntValue, NullValue, NullValue>()
+			.run(directedSimpleGraph)
+			.execute();
+
+		assertEquals(expectedResult, triadCensus);
+	}
+
+	@Test
+	public void testWithCompleteGraph()
+			throws Exception {
+		long expectedDegree = completeGraphVertexCount - 1;
+		long expectedCount = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2) / 3;
+
+		Result expectedResult = new Result(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, expectedCount);
+
+		Result triadCensus = new TriadicCensus<LongValue, NullValue, NullValue>()
+			.run(completeGraph)
+			.execute();
+
+		assertEquals(expectedResult, triadCensus);
+	}
+
+	@Test
+	public void testWithEmptyGraph()
+			throws Exception {
+		Result expectedResult = new Result(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+
+		Result triadCensus = new TriadicCensus<LongValue, NullValue, NullValue>()
+			.run(emptyGraph)
+			.execute();
+
+		assertEquals(expectedResult, triadCensus);
+	}
+
+	@Test
+	public void testWithUndirectedRMatGraph()
+			throws Exception {
+		Result expectedResult = new Result(113_435_893, 0, 7_616_063, 0, 0, 0, 0, 0, 0, 0, 778_295, 0, 0, 0, 0, 75_049);
+
+		Result triadCensus = new TriadicCensus<LongValue, NullValue, NullValue>()
+			.run(undirectedRMatGraph)
+			.execute();
+
+		assertEquals(expectedResult, triadCensus);
+	}
+
+	/*
+	 * This test result can be verified with the following Python script.
+
+	import networkx as nx
+
+	graph=nx.read_edgelist('directedRMatGraph.csv', delimiter=',', create_using=nx.DiGraph())
+	census=nx.algorithms.triads.triadic_census(graph)
+	for key in ['003', '012', '102', '021D', '021U', '021C', '111D', '111U', \
+				'030T', '030C', '201', '120D', '120U', '120C', '210', '300']:
+		print('{}: {}'.format(key, census[key]))
+	 */
+	@Test
+	public void testWithDirectedRMatGraph()
+			throws Exception {
+		Result expectedResult = new Result(
+			113_435_893, 6_632_528, 983_535, 118_574,
+			118_566, 237_767, 129_773, 130_041,
+			16_981, 5_535, 43_574, 7_449,
+			7_587, 15_178, 17_368, 4_951);
+
+		Result triadCensus = new TriadicCensus<LongValue, NullValue, NullValue>()
+			.run(directedRMatGraph)
+			.execute();
+
+		assertEquals(expectedResult, triadCensus);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/86a3dd58/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensusTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensusTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensusTest.java
new file mode 100644
index 0000000..6d34d95
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensusTest.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.commons.math3.util.CombinatoricsUtils;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.clustering.undirected.TriadicCensus.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class TriadicCensusTest
+extends AsmTestBase {
+
+	@Test
+	public void testWithSimpleGraph()
+			throws Exception {
+		Result expectedResult = new Result(3, 8, 7, 2);
+
+		Result triadCensus = new TriadicCensus<IntValue, NullValue, NullValue>()
+			.run(undirectedSimpleGraph)
+			.execute();
+
+		assertEquals(expectedResult, triadCensus);
+	}
+
+	@Test
+	public void testWithCompleteGraph()
+			throws Exception {
+		long expectedDegree = completeGraphVertexCount - 1;
+		long expectedCount = completeGraphVertexCount * CombinatoricsUtils.binomialCoefficient((int)expectedDegree, 2) / 3;
+
+		Result expectedResult = new Result(0, 0, 0, expectedCount);
+
+		Result triadCensus = new TriadicCensus<LongValue, NullValue, NullValue>()
+			.run(completeGraph)
+			.execute();
+
+		assertEquals(expectedResult, triadCensus);
+	}
+
+	@Test
+	public void testWithEmptyGraph()
+			throws Exception {
+		Result expectedResult = new Result(0, 0, 0, 0);
+
+		Result triadCensus = new TriadicCensus<LongValue, NullValue, NullValue>()
+			.run(emptyGraph)
+			.execute();
+
+		assertEquals(expectedResult, triadCensus);
+	}
+
+	/*
+	 * This test result can be verified with the following Python script.
+
+	import networkx as nx
+
+	graph=nx.read_edgelist('undirectedRMatGraph.csv', delimiter=',', create_using=nx.DiGraph())
+	census=nx.algorithms.triads.triadic_census(graph)
+	for key in ['003', '102', '201', '300']:
+		print('{}: {}'.format(key, census[key]))
+	 */
+	@Test
+	public void testWithRMatGraph()
+			throws Exception {
+		Result expectedResult = new Result(113_435_893, 7_616_063, 778_295, 75_049);
+
+		Result triadCensus = new TriadicCensus<LongValue, NullValue, NullValue>()
+			.run(undirectedRMatGraph)
+			.execute();
+
+		assertEquals(expectedResult, triadCensus);
+	}
+}