You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/10/03 14:55:06 UTC

[1/2] flink git commit: [FLINK-4624] [gelly] Support null values in Graph Summarization

Repository: flink
Updated Branches:
  refs/heads/master 9dbd1e3f7 -> 95c08eab3


[FLINK-4624] [gelly] Support null values in Graph Summarization

* Bug was caused by serializers that cannot handle null values (e.g. Long)
* VertexGroupItem now uses Either<NullValue, VV> instead of VV
* Generalized test cases
* Added tests for vertex/edge values of type Long
* Replaced Guava Lists.newArrayList() with new ArrayList<>()

This closes #2527


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a79efdc2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a79efdc2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a79efdc2

Branch: refs/heads/master
Commit: a79efdc23e1bd35eb7cd2e5eb9551cc297c02dd5
Parents: 9dbd1e3
Author: Martin Junghanns <ma...@gmx.net>
Authored: Wed Sep 21 13:31:41 2016 +0200
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Oct 3 10:36:43 2016 -0400

----------------------------------------------------------------------
 .../graph/examples/data/SummarizationData.java  | 101 ++++++++++---------
 .../graph/library/SummarizationITCase.java      |  90 ++++++++++-------
 .../flink/graph/library/Summarization.java      |  72 ++++++++-----
 3 files changed, 160 insertions(+), 103 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a79efdc2/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java
index ea60ea0..703b66e 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java
@@ -18,12 +18,10 @@
 
 package org.apache.flink.graph.examples.data;
 
-import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.NullValue;
 
 import java.util.ArrayList;
 import java.util.List;
@@ -35,6 +33,43 @@ public class SummarizationData {
 
 	private SummarizationData() {}
 
+
+	/**
+	 * Vertices of the input graph.
+	 *
+	 * Format:
+	 *
+	 * "vertex-id;vertex-value"
+	 */
+	private static final String[] INPUT_VERTICES = new String[] {
+		"0;1",
+		"1;1",
+		"2;2",
+		"3;2",
+		"4;2",
+		"5;3"
+	};
+
+	/**
+	 * Edges of the input graph.
+	 *
+	 * Format:
+	 *
+	 * "source-id;target-id;edge-value
+	 */
+	private static final String[] INPUT_EDGES = new String[] {
+		"0;1;1",
+		"1;0;1",
+		"1;2;1",
+		"2;1;1",
+		"2;3;2",
+		"3;2;2",
+		"4;0;3",
+		"4;1;3",
+		"5;2;4",
+		"5;3;4"
+	};
+
 	/**
 	 * The resulting vertex id can be any id of the vertices summarized by the single vertex.
 	 *
@@ -43,9 +78,9 @@ public class SummarizationData {
 	 * "possible-id[,possible-id];group-value,group-count"
 	 */
 	public static final String[] EXPECTED_VERTICES = new String[] {
-			"0,1;A,2",
-			"2,3,4;B,3",
-			"5;C,1"
+			"0,1;1,2",
+			"2,3,4;2,3",
+			"5;3,1"
 	};
 
 	/**
@@ -54,12 +89,12 @@ public class SummarizationData {
 	 * "possible-source-id[,possible-source-id];possible-target-id[,possible-target-id];group-value,group-count"
 	 */
 	public static final String[] EXPECTED_EDGES_WITH_VALUES = new String[] {
-			"0,1;0,1;A,2",
-			"0,1;2,3,4;A,1",
-			"2,3,4;0,1;A,1",
-			"2,3,4;0,1;C,2",
-			"2,3,4;2,3,4;B,2",
-			"5;2,3,4;D,2"
+			"0,1;0,1;1,2",
+			"0,1;2,3,4;1,1",
+			"2,3,4;0,1;1,1",
+			"2,3,4;0,1;3,2",
+			"2,3,4;2,3,4;2,2",
+			"5;2,3,4;4,2"
 	};
 
 	/**
@@ -82,13 +117,11 @@ public class SummarizationData {
 	 * @return vertex data set with string values
 	 */
 	public static DataSet<Vertex<Long, String>> getVertices(ExecutionEnvironment env) {
-		List<Vertex<Long, String>> vertices = new ArrayList<>(6);
-		vertices.add(new Vertex<>(0L, "A"));
-		vertices.add(new Vertex<>(1L, "A"));
-		vertices.add(new Vertex<>(2L, "B"));
-		vertices.add(new Vertex<>(3L, "B"));
-		vertices.add(new Vertex<>(4L, "B"));
-		vertices.add(new Vertex<>(5L, "C"));
+		List<Vertex<Long, String>> vertices = new ArrayList<>(INPUT_VERTICES.length);
+		for (String vertex : INPUT_VERTICES) {
+			String[] tokens = vertex.split(";");
+			vertices.add(new Vertex<>(Long.parseLong(tokens[0]), tokens[1]));
+		}
 
 		return env.fromCollection(vertices);
 	}
@@ -100,34 +133,12 @@ public class SummarizationData {
 	 * @return edge data set with string values
 	 */
 	public static DataSet<Edge<Long, String>> getEdges(ExecutionEnvironment env) {
-		List<Edge<Long, String>> edges = new ArrayList<>(10);
-		edges.add(new Edge<>(0L, 1L, "A"));
-		edges.add(new Edge<>(1L, 0L, "A"));
-		edges.add(new Edge<>(1L, 2L, "A"));
-		edges.add(new Edge<>(2L, 1L, "A"));
-		edges.add(new Edge<>(2L, 3L, "B"));
-		edges.add(new Edge<>(3L, 2L, "B"));
-		edges.add(new Edge<>(4L, 0L, "C"));
-		edges.add(new Edge<>(4L, 1L, "C"));
-		edges.add(new Edge<>(5L, 2L, "D"));
-		edges.add(new Edge<>(5L, 3L, "D"));
+		List<Edge<Long, String>> edges = new ArrayList<>(INPUT_EDGES.length);
+		for (String edge : INPUT_EDGES) {
+			String[] tokens = edge.split(";");
+			edges.add(new Edge<>(Long.parseLong(tokens[0]), Long.parseLong(tokens[1]), tokens[2]));
+		}
 
 		return env.fromCollection(edges);
 	}
-
-	/**
-	 * Creates a set of edges with {@link NullValue} as edge value.
-	 *
-	 * @param env execution environment
-	 * @return edge data set with null values
-	 */
-	@SuppressWarnings("serial")
-	public static DataSet<Edge<Long, NullValue>> getEdgesWithAbsentValues(ExecutionEnvironment env) {
-		return getEdges(env).map(new MapFunction<Edge<Long, String>, Edge<Long, NullValue>>() {
-			@Override
-			public Edge<Long, NullValue> map(Edge<Long, String> value) throws Exception {
-				return new Edge<>(value.getSource(), value.getTarget(), NullValue.getInstance());
-			}
-		});
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a79efdc2/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
index 17ddcfa..43514dc 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
@@ -18,12 +18,15 @@
 
 package org.apache.flink.graph.library;
 
-import com.google.common.collect.Lists;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.translate.TranslateEdgeValues;
+import org.apache.flink.graph.asm.translate.TranslateFunction;
+import org.apache.flink.graph.asm.translate.TranslateVertexValues;
+import org.apache.flink.graph.asm.translate.translators.ToNullValue;
 import org.apache.flink.graph.examples.data.SummarizationData;
 import org.apache.flink.graph.library.Summarization.EdgeValue;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
@@ -32,6 +35,7 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
@@ -53,16 +57,15 @@ public class SummarizationITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testWithVertexAndEdgeValues() throws Exception {
+	public void testWithVertexAndEdgeStringValues() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		Graph<Long, String, String> input = Graph.fromDataSet(
 				SummarizationData.getVertices(env),
 				SummarizationData.getEdges(env),
-				env
-		);
+				env);
 
-		List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices = Lists.newArrayList();
-		List<Edge<Long, EdgeValue<String>>> summarizedEdges = Lists.newArrayList();
+		List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices = new ArrayList<>();
+		List<Edge<Long, EdgeValue<String>>> summarizedEdges = new ArrayList<>();
 
 		Graph<Long, Summarization.VertexValue<String>, EdgeValue<String>> output =
 				input.run(new Summarization<Long, String, String>());
@@ -77,16 +80,16 @@ public class SummarizationITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testWithVertexAndAbsentEdgeValues() throws Exception {
+	public void testWithVertexAndAbsentEdgeStringValues() throws Exception {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		Graph<Long, String, NullValue> input = Graph.fromDataSet(
 				SummarizationData.getVertices(env),
-				SummarizationData.getEdgesWithAbsentValues(env),
-				env
-		);
+				SummarizationData.getEdges(env),
+				env)
+			.run(new TranslateEdgeValues<Long, String, String, NullValue>(new ToNullValue<String>()));
 
-		List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices = Lists.newArrayList();
-		List<Edge<Long, EdgeValue<NullValue>>> summarizedEdges = Lists.newArrayList();
+		List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices = new ArrayList<>();
+		List<Edge<Long, EdgeValue<NullValue>>> summarizedEdges = new ArrayList<>();
 
 		Graph<Long, Summarization.VertexValue<String>, EdgeValue<NullValue>> output =
 				input.run(new Summarization<Long, String, NullValue>());
@@ -100,23 +103,41 @@ public class SummarizationITCase extends MultipleProgramsTestBase {
 		validateEdges(SummarizationData.EXPECTED_EDGES_ABSENT_VALUES, summarizedEdges);
 	}
 
-	private void validateVertices(String[] expectedVertices,
-																List<Vertex<Long, Summarization.VertexValue<String>>> actualVertices) {
+	@Test
+	public void testWithVertexAndEdgeLongValues() throws Exception {
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> input = Graph.fromDataSet(
+				SummarizationData.getVertices(env),
+				SummarizationData.getEdges(env),
+				env)
+			.run(new TranslateVertexValues<Long, String, Long, String>(new StringToLong()))
+			.run(new TranslateEdgeValues<Long, Long, String, Long>(new StringToLong()));
+
+		List<Vertex<Long, Summarization.VertexValue<Long>>> summarizedVertices = new ArrayList<>();
+		List<Edge<Long, EdgeValue<Long>>> summarizedEdges = new ArrayList<>();
+
+		Graph<Long, Summarization.VertexValue<Long>, EdgeValue<Long>> output =
+			input.run(new Summarization<Long, Long, Long>());
+
+		output.getVertices().output(new LocalCollectionOutputFormat<>(summarizedVertices));
+		output.getEdges().output(new LocalCollectionOutputFormat<>(summarizedEdges));
+
+		env.execute();
+
+		validateVertices(SummarizationData.EXPECTED_VERTICES, summarizedVertices);
+		validateEdges(SummarizationData.EXPECTED_EDGES_WITH_VALUES, summarizedEdges);
+	}
+
+	private <VV extends Comparable<VV>> void validateVertices(String[] expectedVertices, List<Vertex<Long, Summarization.VertexValue<VV>>> actualVertices) {
 		Arrays.sort(expectedVertices);
-		Collections.sort(actualVertices, new Comparator<Vertex<Long, Summarization.VertexValue<String>>>() {
+		Collections.sort(actualVertices, new Comparator<Vertex<Long, Summarization.VertexValue<VV>>>() {
 			@Override
-			public int compare(Vertex<Long, Summarization.VertexValue<String>> o1,
-												 Vertex<Long, Summarization.VertexValue<String>> o2) {
+			public int compare(Vertex<Long, Summarization.VertexValue<VV>> o1, Vertex<Long, Summarization.VertexValue<VV>> o2) {
 				int result = o1.getId().compareTo(o2.getId());
 				if (result == 0) {
 					result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
 				}
-				if (result == 0) {
-					result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
-				}
-				if (result == 0) {
-					result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
-				}
 				return result;
 			}
 		});
@@ -126,8 +147,7 @@ public class SummarizationITCase extends MultipleProgramsTestBase {
 		}
 	}
 
-	private <EV extends Comparable<EV>> void validateEdges(String[] expectedEdges,
-														 List<Edge<Long, EdgeValue<EV>>> actualEdges) {
+	private <EV extends Comparable<EV>> void validateEdges(String[] expectedEdges, List<Edge<Long, EdgeValue<EV>>> actualEdges) {
 		Arrays.sort(expectedEdges);
 		Collections.sort(actualEdges, new Comparator<Edge<Long, EdgeValue<EV>>> () {
 
@@ -138,14 +158,8 @@ public class SummarizationITCase extends MultipleProgramsTestBase {
 					result = o1.getTarget().compareTo(o2.getTarget());
 				}
 				if (result == 0) {
-					result = o1.getTarget().compareTo(o2.getTarget());
-				}
-				if (result == 0) {
 					result = o1.getValue().getEdgeGroupValue().compareTo(o2.getValue().getEdgeGroupValue());
 				}
-				if (result == 0) {
-					result = o1.getValue().getEdgeGroupCount().compareTo(o2.getValue().getEdgeGroupCount());
-				}
 				return result;
 			}
 		});
@@ -155,10 +169,10 @@ public class SummarizationITCase extends MultipleProgramsTestBase {
 		}
 	}
 
-	private void validateVertex(String expected, Vertex<Long, Summarization.VertexValue<String>> actual) {
+	private <VV> void validateVertex(String expected, Vertex<Long, Summarization.VertexValue<VV>> actual) {
 		String[] tokens = TOKEN_SEPARATOR.split(expected);
 		assertTrue(getListFromIdRange(tokens[0]).contains(actual.getId()));
-		assertEquals(getGroupValue(tokens[1]), actual.getValue().getVertexGroupValue());
+		assertEquals(getGroupValue(tokens[1]), actual.getValue().getVertexGroupValue().toString());
 		assertEquals(getGroupCount(tokens[1]), actual.getValue().getVertexGroupCount());
 	}
 
@@ -171,7 +185,7 @@ public class SummarizationITCase extends MultipleProgramsTestBase {
 	}
 
 	private List<Long> getListFromIdRange(String idRange) {
-		List<Long> result = Lists.newArrayList();
+		List<Long> result = new ArrayList<>();
 		for (String id : ID_SEPARATOR.split(idRange)) {
 			result.add(Long.parseLong(id));
 		}
@@ -185,4 +199,12 @@ public class SummarizationITCase extends MultipleProgramsTestBase {
 	private Long getGroupCount(String token) {
 		return Long.valueOf(ID_SEPARATOR.split(token)[1]);
 	}
+
+	private static class StringToLong implements TranslateFunction<String, Long> {
+
+		@Override
+		public Long translate(String value, Long reuse) throws Exception {
+			return Long.parseLong(value);
+		}
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/a79efdc2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
index 0dcdc1f..2361247 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
@@ -22,16 +22,23 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.functions.FunctionAnnotation;
 import org.apache.flink.api.java.operators.GroupReduceOperator;
 import org.apache.flink.api.java.operators.UnsortedGrouping;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
 /**
@@ -90,8 +97,18 @@ public class Summarization<K, VV, EV>
 
 	@Override
 	public Graph<K, VertexValue<VV>, EdgeValue<EV>> run(Graph<K, VV, EV> input) throws Exception {
+		// create type infos for correct return type definitions
+		// Note: this use of type hints is only required due to
+		// limitations of the type parser for the Either type
+		// which are being fixed in FLINK-4673
+		TypeInformation<K> keyType = ((TupleTypeInfo<?>) input.getVertices().getType()).getTypeAt(0);
+		TypeInformation<VV> valueType = ((TupleTypeInfo<?>) input.getVertices().getType()).getTypeAt(1);
+		@SuppressWarnings("unchecked")
+		TupleTypeInfo<Vertex<K, VertexValue<VV>>> vertexType = (TupleTypeInfo<Vertex<K, VertexValue<VV>>>) new TupleTypeInfo(
+			Vertex.class, keyType, new TupleTypeInfo(VertexValue.class, valueType, BasicTypeInfo.LONG_TYPE_INFO));
+
 		// -------------------------
-		// build summarized vertices
+		// build super vertices
 		// -------------------------
 
 		// group vertices by value
@@ -100,19 +117,20 @@ public class Summarization<K, VV, EV>
 		// reduce vertex group and create vertex group items
 		GroupReduceOperator<Vertex<K, VV>, VertexGroupItem<K, VV>> vertexGroupItems = vertexUnsortedGrouping
 				.reduceGroup(new VertexGroupReducer<K, VV>());
-		// create summarized vertices
+		// create super vertices
 		DataSet<Vertex<K, VertexValue<VV>>> summarizedVertices = vertexGroupItems
 				.filter(new VertexGroupItemToSummarizedVertexFilter<K, VV>())
-				.map(new VertexGroupItemToSummarizedVertexMapper<K, VV>());
-		// create mapping between vertices and their representative
-		DataSet<VertexWithRepresentative<K>> vertexToRepresentativeMap = vertexGroupItems
-				.filter(new VertexGroupItemToRepresentativeFilter<K, VV>())
-				.map(new VertexGroupItemToVertexWithRepresentativeMapper<K, VV>());
+				.map(new VertexGroupItemToSummarizedVertexMapper<K, VV>())
+				.returns(vertexType);
 
 		// -------------------------
-		// build summarized edges
+		// build super edges
 		// -------------------------
 
+		// create mapping between vertices and their representative
+		DataSet<VertexWithRepresentative<K>> vertexToRepresentativeMap = vertexGroupItems
+			.filter(new VertexGroupItemToRepresentativeFilter<K, VV>())
+			.map(new VertexGroupItemToVertexWithRepresentativeMapper<K, VV>());
 		// join edges with vertex representatives and update source and target identifiers
 		DataSet<Edge<K, EV>> edgesForGrouping = input.getEdges()
 				.join(vertexToRepresentativeMap)
@@ -123,7 +141,7 @@ public class Summarization<K, VV, EV>
 				.where(1) 	// target vertex id
 				.equalTo(0) // vertex id
 				.with(new TargetVertexJoinFunction<K, EV>());
-		// create summarized edges
+		// create super edges
 		DataSet<Edge<K, EdgeValue<EV>>> summarizedEdges = edgesForGrouping
 				.groupBy(0, 1, 2) // group by source id (0), target id (1) and edge value (2)
 				.reduceGroup(new EdgeGroupReducer<K, EV>());
@@ -203,10 +221,12 @@ public class Summarization<K, VV, EV>
 	 * @param <VGV> vertex group value type
 	 */
 	@SuppressWarnings("serial")
-	public static final class VertexGroupItem<K, VGV> extends Tuple4<K, K, VGV, Long> {
+	public static final class VertexGroupItem<K, VGV> extends Tuple4<K, K, Either<VGV, NullValue>, Long> {
+
+		private final Either.Right<VGV, NullValue> nullValue = new Either.Right<>(NullValue.getInstance());
 
 		public VertexGroupItem() {
-			setVertexGroupCount(0L);
+			reset();
 		}
 
 		public K getVertexId() {
@@ -226,11 +246,15 @@ public class Summarization<K, VV, EV>
 		}
 
 		public VGV getVertexGroupValue() {
-			return f2;
+			return f2.isLeft() ? f2.left() : null;
 		}
 
 		public void setVertexGroupValue(VGV vertexGroupValue) {
-			f2 = vertexGroupValue;
+			if (vertexGroupValue == null) {
+				f2 = nullValue;
+			} else {
+				f2 = new Either.Left<>(vertexGroupValue);
+			}
 		}
 
 		public Long getVertexGroupCount() {
@@ -247,7 +271,7 @@ public class Summarization<K, VV, EV>
 		public void reset() {
 			f0 = null;
 			f1 = null;
-			f2 = null;
+			f2 = nullValue;
 			f3 = 0L;
 		}
 	}
@@ -290,11 +314,13 @@ public class Summarization<K, VV, EV>
 	 */
 	@SuppressWarnings("serial")
 	private static final class VertexGroupReducer<K, VV>
-			implements GroupReduceFunction<Vertex<K, VV>, VertexGroupItem<K, VV>> {
+			extends RichGroupReduceFunction<Vertex<K, VV>, VertexGroupItem<K, VV>> {
 
-		private final VertexGroupItem<K, VV> reuseVertexGroupItem;
+		private transient VertexGroupItem<K, VV> reuseVertexGroupItem;
 
-		private VertexGroupReducer() {
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			super.open(parameters);
 			this.reuseVertexGroupItem = new VertexGroupItem<>();
 		}
 
@@ -329,12 +355,10 @@ public class Summarization<K, VV, EV>
 		 * group.
 		 *
 		 * @param vertexGroupRepresentativeId group representative vertex identifier
-		 * @param vertexGroupValue  					group property value
-		 * @param vertexGroupCount          	total group count
+		 * @param vertexGroupValue  		  group property value
+		 * @param vertexGroupCount            total group count
 		 */
-		private void createGroupRepresentativeTuple(K vertexGroupRepresentativeId,
-																								VV vertexGroupValue,
-																								Long vertexGroupCount) {
+		private void createGroupRepresentativeTuple(K vertexGroupRepresentativeId, VV vertexGroupValue, Long vertexGroupCount) {
 			reuseVertexGroupItem.setVertexId(vertexGroupRepresentativeId);
 			reuseVertexGroupItem.setVertexGroupValue(vertexGroupValue);
 			reuseVertexGroupItem.setVertexGroupCount(vertexGroupCount);
@@ -511,9 +535,9 @@ public class Summarization<K, VV, EV>
 	@FunctionAnnotation.ForwardedFieldsSecond("f1") 	// vertex group id -> edge target id
 	private static final class TargetVertexJoinFunction<K, EV>
 			implements JoinFunction<Edge<K, EV>, VertexWithRepresentative<K>, Edge<K, EV>> {
+
 		@Override
-		public Edge<K, EV> join(Edge<K, EV> edge,
-														VertexWithRepresentative<K> vertexRepresentative) throws Exception {
+		public Edge<K, EV> join(Edge<K, EV> edge, VertexWithRepresentative<K> vertexRepresentative) throws Exception {
 			edge.setTarget(vertexRepresentative.getGroupRepresentativeId());
 			return edge;
 		}


[2/2] flink git commit: [FLINK-4643] [gelly] Average Clustering Coefficient

Posted by gr...@apache.org.
[FLINK-4643] [gelly] Average Clustering Coefficient

Directed and undirected analytics computing the average clustering
coefficient over vertices in a graph and an updated driver.

This closes #2528


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95c08eab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95c08eab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95c08eab

Branch: refs/heads/master
Commit: 95c08eab36bfa09c501a84f5b5f116d666d03ae1
Parents: a79efdc
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Sep 20 12:00:04 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Oct 3 10:37:02 2016 -0400

----------------------------------------------------------------------
 .../graph/examples/ClusteringCoefficient.java   |  32 ++-
 .../directed/AverageClusteringCoefficient.java  | 212 +++++++++++++++++++
 .../AverageClusteringCoefficient.java           | 212 +++++++++++++++++++
 .../org/apache/flink/graph/asm/AsmTestBase.java |   4 +-
 .../AverageClusteringCoefficientTest.java       |  82 +++++++
 .../AverageClusteringCoefficientTest.java       |  82 +++++++
 6 files changed, 622 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
index 7835531..615d765 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
@@ -102,6 +102,7 @@ public class ClusteringCoefficient {
 
 		// global and local clustering coefficient results
 		GraphAnalytic gcc;
+		GraphAnalytic acc;
 		DataSet lcc;
 
 		switch (parameters.get("input", "")) {
@@ -127,6 +128,9 @@ public class ClusteringCoefficient {
 							gcc = graph
 								.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
+							acc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
 							lcc = graph
 								.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
@@ -134,6 +138,9 @@ public class ClusteringCoefficient {
 							gcc = graph
 								.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
+							acc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
 							lcc = graph
 								.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
@@ -148,6 +155,9 @@ public class ClusteringCoefficient {
 							gcc = graph
 								.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
+							acc = graph
+								.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
 							lcc = graph
 								.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
@@ -155,6 +165,9 @@ public class ClusteringCoefficient {
 							gcc = graph
 								.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
+							acc = graph
+								.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
+									.setLittleParallelism(little_parallelism));
 							lcc = graph
 								.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
 									.setLittleParallelism(little_parallelism));
@@ -189,6 +202,9 @@ public class ClusteringCoefficient {
 						gcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
 								.setLittleParallelism(little_parallelism));
+						acc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
 						lcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
 								.setIncludeZeroDegreeVertices(false)
@@ -203,6 +219,9 @@ public class ClusteringCoefficient {
 						gcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
 								.setLittleParallelism(little_parallelism));
+						acc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
 						lcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
 								.setIncludeZeroDegreeVertices(false)
@@ -219,6 +238,9 @@ public class ClusteringCoefficient {
 						gcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
 								.setLittleParallelism(little_parallelism));
+						acc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
 						lcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
 								.setIncludeZeroDegreeVertices(false)
@@ -233,6 +255,9 @@ public class ClusteringCoefficient {
 						gcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
 								.setLittleParallelism(little_parallelism));
+						acc = newGraph
+							.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+								.setLittleParallelism(little_parallelism));
 						lcc = newGraph
 							.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
 								.setIncludeZeroDegreeVertices(false)
@@ -262,11 +287,13 @@ public class ClusteringCoefficient {
 					}
 				}
 				System.out.println(gcc.getResult());
+				System.out.println(acc.getResult());
 				break;
 
 			case "hash":
 				System.out.println(DataSetUtils.checksumHashCode(lcc));
 				System.out.println(gcc.getResult());
+				System.out.println(acc.getResult());
 				break;
 
 			case "csv":
@@ -280,7 +307,10 @@ public class ClusteringCoefficient {
 
 				lcc.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
 
-				System.out.println(gcc.execute());
+				env.execute("Clustering Coefficient");
+
+				System.out.println(gcc.getResult());
+				System.out.println(acc.getResult());
 				break;
 
 			default:

http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
new file mode 100644
index 0000000..f589d04
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.directed;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient.Result;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.AbstractID;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * The average clustering coefficient measures the mean connectedness of a
+ * graph. Scores range from 0.0 (no triangles) to 1.0 (complete graph).
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class AverageClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+	private String id = new AbstractID().toString();
+
+	// Optional configuration
+	private int littleParallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Override the parallelism of operators processing small amounts of data.
+	 *
+	 * @param littleParallelism operator parallelism
+	 * @return this
+	 */
+	public AverageClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
+		this.littleParallelism = littleParallelism;
+
+		return this;
+	}
+
+	/*
+	 * Implementation notes:
+	 *
+	 * The requirement that "K extends CopyableValue<K>" can be removed when
+	 *   removed from LocalClusteringCoefficient.
+	 */
+
+	@Override
+	public AverageClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input)
+			throws Exception {
+		super.run(input);
+
+		DataSet<LocalClusteringCoefficient.Result<K>> localClusteringCoefficient = input
+			.run(new LocalClusteringCoefficient<K, VV, EV>()
+				.setLittleParallelism(littleParallelism));
+
+		localClusteringCoefficient
+			.output(new AverageClusteringCoefficientHelper<K>(id))
+				.name("Average clustering coefficient");
+
+		return this;
+	}
+
+	@Override
+	public Result getResult() {
+		JobExecutionResult res = env.getLastJobExecutionResult();
+
+		long vertexCount = res.getAccumulatorResult(id + "-0");
+		double sumOfLocalClusteringCoefficient = res.getAccumulatorResult(id + "-1");
+
+		return new Result(vertexCount, sumOfLocalClusteringCoefficient);
+	}
+
+	/**
+	 * Helper class to collect the average clustering coefficient.
+	 *
+	 * @param <T> ID type
+	 */
+	private static class AverageClusteringCoefficientHelper<T>
+	extends RichOutputFormat<LocalClusteringCoefficient.Result<T>> {
+		private final String id;
+
+		private long vertexCount;
+		private double sumOfLocalClusteringCoefficient;
+
+		/**
+		 * The unique id is required because Flink's accumulator namespace is
+		 * shared among all operators.
+		 *
+		 * @param id unique string used for accumulator names
+		 */
+		public AverageClusteringCoefficientHelper(String id) {
+			this.id = id;
+		}
+
+		@Override
+		public void configure(Configuration parameters) {}
+
+		@Override
+		public void open(int taskNumber, int numTasks) throws IOException {}
+
+		@Override
+		public void writeRecord(LocalClusteringCoefficient.Result<T> record) throws IOException {
+			vertexCount++;
+
+			// local clustering coefficient is only defined on vertices with
+			// at least two neighbors yielding at least one pair of neighbors
+			if (record.getDegree().getValue() > 1) {
+				sumOfLocalClusteringCoefficient += record.getLocalClusteringCoefficientScore();
+			}
+		}
+
+		@Override
+		public void close() throws IOException {
+			getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount));
+			getRuntimeContext().addAccumulator(id + "-1", new DoubleCounter(sumOfLocalClusteringCoefficient));
+		}
+	}
+
+	/**
+	 * Wraps global clustering coefficient metrics.
+	 */
+	public static class Result {
+		private long vertexCount;
+
+		private double averageLocalClusteringCoefficient;
+
+		/**
+		 * Instantiate an immutable result.
+		 *
+		 * @param vertexCount vertex count
+		 * @param sumOfLocalClusteringCoefficient sum over the vertices' local
+		 *                                        clustering coefficients
+		 */
+		public Result(long vertexCount, double sumOfLocalClusteringCoefficient) {
+			this.vertexCount = vertexCount;
+			this.averageLocalClusteringCoefficient = sumOfLocalClusteringCoefficient / vertexCount;
+		}
+
+		/**
+		 * Get the number of vertices.
+		 *
+		 * @return number of vertices
+		 */
+		public long getNumberOfVertices() {
+			return vertexCount;
+		}
+
+		/**
+		 * Get the average clustering coefficient.
+		 *
+		 * @return number of triangles
+		 */
+		public double getAverageClusteringCoefficient() {
+			return averageLocalClusteringCoefficient;
+		}
+
+		@Override
+		public String toString() {
+			return "vertex count: " + vertexCount
+				+ ", average clustering coefficient: " + averageLocalClusteringCoefficient;
+		}
+
+		@Override
+		public int hashCode() {
+			return new HashCodeBuilder()
+				.append(vertexCount)
+				.append(averageLocalClusteringCoefficient)
+				.hashCode();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == null) { return false; }
+			if (obj == this) { return true; }
+			if (obj.getClass() != getClass()) { return false; }
+
+			Result rhs = (Result)obj;
+
+			return new EqualsBuilder()
+				.append(vertexCount, rhs.vertexCount)
+				.append(averageLocalClusteringCoefficient, rhs.averageLocalClusteringCoefficient)
+				.isEquals();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
new file mode 100644
index 0000000..03dbc71
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient.Result;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.AbstractID;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * The average clustering coefficient measures the mean connectedness of a
+ * graph. Scores range from 0.0 (no triangles) to 1.0 (complete graph).
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class AverageClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+	private String id = new AbstractID().toString();
+
+	// Optional configuration
+	private int littleParallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Override the parallelism of operators processing small amounts of data.
+	 *
+	 * @param littleParallelism operator parallelism
+	 * @return this
+	 */
+	public AverageClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
+		this.littleParallelism = littleParallelism;
+
+		return this;
+	}
+
+	/*
+	 * Implementation notes:
+	 *
+	 * The requirement that "K extends CopyableValue<K>" can be removed when
+	 *   removed from LocalClusteringCoefficient.
+	 */
+
+	@Override
+	public AverageClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input)
+			throws Exception {
+		super.run(input);
+
+		DataSet<LocalClusteringCoefficient.Result<K>> localClusteringCoefficient = input
+			.run(new LocalClusteringCoefficient<K, VV, EV>()
+				.setLittleParallelism(littleParallelism));
+
+		localClusteringCoefficient
+			.output(new AverageClusteringCoefficientHelper<K>(id))
+				.name("Average clustering coefficient");
+
+		return this;
+	}
+
+	@Override
+	public Result getResult() {
+		JobExecutionResult res = env.getLastJobExecutionResult();
+
+		long vertexCount = res.getAccumulatorResult(id + "-0");
+		double sumOfLocalClusteringCoefficient = res.getAccumulatorResult(id + "-1");
+
+		return new Result(vertexCount, sumOfLocalClusteringCoefficient);
+	}
+
+	/**
+	 * Helper class to collect the average clustering coefficient.
+	 *
+	 * @param <T> ID type
+	 */
+	private static class AverageClusteringCoefficientHelper<T>
+	extends RichOutputFormat<LocalClusteringCoefficient.Result<T>> {
+		private final String id;
+
+		private long vertexCount;
+		private double sumOfLocalClusteringCoefficient;
+
+		/**
+		 * The unique id is required because Flink's accumulator namespace is
+		 * shared among all operators.
+		 *
+		 * @param id unique string used for accumulator names
+		 */
+		public AverageClusteringCoefficientHelper(String id) {
+			this.id = id;
+		}
+
+		@Override
+		public void configure(Configuration parameters) {}
+
+		@Override
+		public void open(int taskNumber, int numTasks) throws IOException {}
+
+		@Override
+		public void writeRecord(LocalClusteringCoefficient.Result<T> record) throws IOException {
+			vertexCount++;
+
+			// local clustering coefficient is only defined on vertices with
+			// at least two neighbors yielding at least one pair of neighbors
+			if (record.getDegree().getValue() > 1) {
+				sumOfLocalClusteringCoefficient += record.getLocalClusteringCoefficientScore();
+			}
+		}
+
+		@Override
+		public void close() throws IOException {
+			getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount));
+			getRuntimeContext().addAccumulator(id + "-1", new DoubleCounter(sumOfLocalClusteringCoefficient));
+		}
+	}
+
+	/**
+	 * Wraps global clustering coefficient metrics.
+	 */
+	public static class Result {
+		private long vertexCount;
+
+		private double averageLocalClusteringCoefficient;
+
+		/**
+		 * Instantiate an immutable result.
+		 *
+		 * @param vertexCount vertex count
+		 * @param sumOfLocalClusteringCoefficient sum over the vertices' local
+		 *                                        clustering coefficients
+		 */
+		public Result(long vertexCount, double sumOfLocalClusteringCoefficient) {
+			this.vertexCount = vertexCount;
+			this.averageLocalClusteringCoefficient = sumOfLocalClusteringCoefficient / vertexCount;
+		}
+
+		/**
+		 * Get the number of vertices.
+		 *
+		 * @return number of vertices
+		 */
+		public long getNumberOfVertices() {
+			return vertexCount;
+		}
+
+		/**
+		 * Get the average clustering coefficient.
+		 *
+		 * @return number of triangles
+		 */
+		public double getAverageClusteringCoefficient() {
+			return averageLocalClusteringCoefficient;
+		}
+
+		@Override
+		public String toString() {
+			return "vertex count: " + vertexCount
+				+ ", average clustering coefficient: " + averageLocalClusteringCoefficient;
+		}
+
+		@Override
+		public int hashCode() {
+			return new HashCodeBuilder()
+				.append(vertexCount)
+				.append(averageLocalClusteringCoefficient)
+				.hashCode();
+		}
+
+		@Override
+		public boolean equals(Object obj) {
+			if (obj == null) { return false; }
+			if (obj == this) { return true; }
+			if (obj.getClass() != getClass()) { return false; }
+
+			Result rhs = (Result)obj;
+
+			return new EqualsBuilder()
+				.append(vertexCount, rhs.vertexCount)
+				.append(averageLocalClusteringCoefficient, rhs.averageLocalClusteringCoefficient)
+				.isEquals();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index 28e2669..8ef87a5 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -48,6 +48,8 @@ public class AsmTestBase {
 	protected Graph<LongValue,NullValue,NullValue> completeGraph;
 
 	// empty graph
+	protected final long emptyGraphVertexCount = 3;
+
 	protected Graph<LongValue,NullValue,NullValue> emptyGraph;
 
 	// RMat graph
@@ -86,7 +88,7 @@ public class AsmTestBase {
 			.generate();
 
 		// empty graph
-		emptyGraph = new EmptyGraph(env, 3)
+		emptyGraph = new EmptyGraph(env, emptyGraphVertexCount)
 			.generate();
 
 		// RMat graph

http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficientTest.java
new file mode 100644
index 0000000..9de9bac
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficientTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.directed;
+
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class AverageClusteringCoefficientTest
+extends AsmTestBase {
+
+	@Test
+	public void testWithSimpleGraph()
+			throws Exception {
+		// see results in LocalClusteringCoefficientTest.testSimpleGraph
+		Result expectedResult = new Result(6, 1.0/2 + 2.0/6 + 2.0/6 + 1.0/12);
+
+		Result averageClusteringCoefficient = new AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+			.run(directedSimpleGraph)
+			.execute();
+
+		assertEquals(expectedResult, averageClusteringCoefficient);
+	}
+
+	@Test
+	public void testWithCompleteGraph()
+			throws Exception {
+		Result expectedResult = new Result(completeGraphVertexCount, completeGraphVertexCount);
+
+		Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+			.run(completeGraph)
+			.execute();
+
+		assertEquals(expectedResult, averageClusteringCoefficient);
+	}
+
+	@Test
+	public void testWithEmptyGraph()
+			throws Exception {
+		Result expectedResult = new Result(emptyGraphVertexCount, 0);
+
+		Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+			.run(emptyGraph)
+			.execute();
+
+		assertEquals(expectedResult, averageClusteringCoefficient);
+	}
+
+	@Test
+	public void testWithRMatGraph()
+			throws Exception {
+		Result expectedResult = new Result(902, 297.152607);
+
+		Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+			.run(directedRMatGraph)
+			.execute();
+
+		assertEquals(expectedResult.getNumberOfVertices(), averageClusteringCoefficient.getNumberOfVertices());
+		assertEquals(expectedResult.getAverageClusteringCoefficient(), averageClusteringCoefficient.getAverageClusteringCoefficient(), 0.000001);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficientTest.java
new file mode 100644
index 0000000..34fda17
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficientTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class AverageClusteringCoefficientTest
+extends AsmTestBase {
+
+	@Test
+	public void testWithSimpleGraph()
+			throws Exception {
+		// see results in LocalClusteringCoefficientTest.testSimpleGraph
+		Result expectedResult = new Result(6, 1.0/1 + 2.0/3 + 2.0/3 + 1.0/6);
+
+		Result averageClusteringCoefficient = new AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+			.run(undirectedSimpleGraph)
+			.execute();
+
+		assertEquals(expectedResult, averageClusteringCoefficient);
+	}
+
+	@Test
+	public void testWithCompleteGraph()
+			throws Exception {
+		Result expectedResult = new Result(completeGraphVertexCount, completeGraphVertexCount);
+
+		Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+			.run(completeGraph)
+			.execute();
+
+		assertEquals(expectedResult, averageClusteringCoefficient);
+	}
+
+	@Test
+	public void testWithEmptyGraph()
+			throws Exception {
+		Result expectedResult = new Result(emptyGraphVertexCount, 0);
+
+		Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+			.run(emptyGraph)
+			.execute();
+
+		assertEquals(expectedResult, averageClusteringCoefficient);
+	}
+
+	@Test
+	public void testWithRMatGraph()
+			throws Exception {
+		Result expectedResult = new Result(902, 380.40109);
+
+		Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+			.run(undirectedRMatGraph)
+			.execute();
+
+		assertEquals(expectedResult.getNumberOfVertices(), averageClusteringCoefficient.getNumberOfVertices());
+		assertEquals(expectedResult.getAverageClusteringCoefficient(), averageClusteringCoefficient.getAverageClusteringCoefficient(), 0.000001);
+	}
+}