You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/10/03 14:55:06 UTC
[1/2] flink git commit: [FLINK-4624] [gelly] Support null values in
Graph Summarization
Repository: flink
Updated Branches:
refs/heads/master 9dbd1e3f7 -> 95c08eab3
[FLINK-4624] [gelly] Support null values in Graph Summarization
* Bug was caused by serializers that cannot handle null values (e.g. Long)
* VertexGroupItem now uses Either<NullValue, VV> instead of VV
* Generalized test cases
* Added tests for vertex/edge values of type Long
* Replaced Guava Lists.newArrayList() with new ArrayList<>()
This closes #2527
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a79efdc2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a79efdc2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a79efdc2
Branch: refs/heads/master
Commit: a79efdc23e1bd35eb7cd2e5eb9551cc297c02dd5
Parents: 9dbd1e3
Author: Martin Junghanns <ma...@gmx.net>
Authored: Wed Sep 21 13:31:41 2016 +0200
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Oct 3 10:36:43 2016 -0400
----------------------------------------------------------------------
.../graph/examples/data/SummarizationData.java | 101 ++++++++++---------
.../graph/library/SummarizationITCase.java | 90 ++++++++++-------
.../flink/graph/library/Summarization.java | 72 ++++++++-----
3 files changed, 160 insertions(+), 103 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/a79efdc2/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java
index ea60ea0..703b66e 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/data/SummarizationData.java
@@ -18,12 +18,10 @@
package org.apache.flink.graph.examples.data;
-import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.NullValue;
import java.util.ArrayList;
import java.util.List;
@@ -35,6 +33,43 @@ public class SummarizationData {
private SummarizationData() {}
+
+ /**
+ * Vertices of the input graph.
+ *
+ * Format:
+ *
+ * "vertex-id;vertex-value"
+ */
+ private static final String[] INPUT_VERTICES = new String[] {
+ "0;1",
+ "1;1",
+ "2;2",
+ "3;2",
+ "4;2",
+ "5;3"
+ };
+
+ /**
+ * Edges of the input graph.
+ *
+ * Format:
+ *
+ * "source-id;target-id;edge-value
+ */
+ private static final String[] INPUT_EDGES = new String[] {
+ "0;1;1",
+ "1;0;1",
+ "1;2;1",
+ "2;1;1",
+ "2;3;2",
+ "3;2;2",
+ "4;0;3",
+ "4;1;3",
+ "5;2;4",
+ "5;3;4"
+ };
+
/**
* The resulting vertex id can be any id of the vertices summarized by the single vertex.
*
@@ -43,9 +78,9 @@ public class SummarizationData {
* "possible-id[,possible-id];group-value,group-count"
*/
public static final String[] EXPECTED_VERTICES = new String[] {
- "0,1;A,2",
- "2,3,4;B,3",
- "5;C,1"
+ "0,1;1,2",
+ "2,3,4;2,3",
+ "5;3,1"
};
/**
@@ -54,12 +89,12 @@ public class SummarizationData {
* "possible-source-id[,possible-source-id];possible-target-id[,possible-target-id];group-value,group-count"
*/
public static final String[] EXPECTED_EDGES_WITH_VALUES = new String[] {
- "0,1;0,1;A,2",
- "0,1;2,3,4;A,1",
- "2,3,4;0,1;A,1",
- "2,3,4;0,1;C,2",
- "2,3,4;2,3,4;B,2",
- "5;2,3,4;D,2"
+ "0,1;0,1;1,2",
+ "0,1;2,3,4;1,1",
+ "2,3,4;0,1;1,1",
+ "2,3,4;0,1;3,2",
+ "2,3,4;2,3,4;2,2",
+ "5;2,3,4;4,2"
};
/**
@@ -82,13 +117,11 @@ public class SummarizationData {
* @return vertex data set with string values
*/
public static DataSet<Vertex<Long, String>> getVertices(ExecutionEnvironment env) {
- List<Vertex<Long, String>> vertices = new ArrayList<>(6);
- vertices.add(new Vertex<>(0L, "A"));
- vertices.add(new Vertex<>(1L, "A"));
- vertices.add(new Vertex<>(2L, "B"));
- vertices.add(new Vertex<>(3L, "B"));
- vertices.add(new Vertex<>(4L, "B"));
- vertices.add(new Vertex<>(5L, "C"));
+ List<Vertex<Long, String>> vertices = new ArrayList<>(INPUT_VERTICES.length);
+ for (String vertex : INPUT_VERTICES) {
+ String[] tokens = vertex.split(";");
+ vertices.add(new Vertex<>(Long.parseLong(tokens[0]), tokens[1]));
+ }
return env.fromCollection(vertices);
}
@@ -100,34 +133,12 @@ public class SummarizationData {
* @return edge data set with string values
*/
public static DataSet<Edge<Long, String>> getEdges(ExecutionEnvironment env) {
- List<Edge<Long, String>> edges = new ArrayList<>(10);
- edges.add(new Edge<>(0L, 1L, "A"));
- edges.add(new Edge<>(1L, 0L, "A"));
- edges.add(new Edge<>(1L, 2L, "A"));
- edges.add(new Edge<>(2L, 1L, "A"));
- edges.add(new Edge<>(2L, 3L, "B"));
- edges.add(new Edge<>(3L, 2L, "B"));
- edges.add(new Edge<>(4L, 0L, "C"));
- edges.add(new Edge<>(4L, 1L, "C"));
- edges.add(new Edge<>(5L, 2L, "D"));
- edges.add(new Edge<>(5L, 3L, "D"));
+ List<Edge<Long, String>> edges = new ArrayList<>(INPUT_EDGES.length);
+ for (String edge : INPUT_EDGES) {
+ String[] tokens = edge.split(";");
+ edges.add(new Edge<>(Long.parseLong(tokens[0]), Long.parseLong(tokens[1]), tokens[2]));
+ }
return env.fromCollection(edges);
}
-
- /**
- * Creates a set of edges with {@link NullValue} as edge value.
- *
- * @param env execution environment
- * @return edge data set with null values
- */
- @SuppressWarnings("serial")
- public static DataSet<Edge<Long, NullValue>> getEdgesWithAbsentValues(ExecutionEnvironment env) {
- return getEdges(env).map(new MapFunction<Edge<Long, String>, Edge<Long, NullValue>>() {
- @Override
- public Edge<Long, NullValue> map(Edge<Long, String> value) throws Exception {
- return new Edge<>(value.getSource(), value.getTarget(), NullValue.getInstance());
- }
- });
- }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a79efdc2/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
index 17ddcfa..43514dc 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/SummarizationITCase.java
@@ -18,12 +18,15 @@
package org.apache.flink.graph.library;
-import com.google.common.collect.Lists;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.io.LocalCollectionOutputFormat;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.translate.TranslateEdgeValues;
+import org.apache.flink.graph.asm.translate.TranslateFunction;
+import org.apache.flink.graph.asm.translate.TranslateVertexValues;
+import org.apache.flink.graph.asm.translate.translators.ToNullValue;
import org.apache.flink.graph.examples.data.SummarizationData;
import org.apache.flink.graph.library.Summarization.EdgeValue;
import org.apache.flink.test.util.MultipleProgramsTestBase;
@@ -32,6 +35,7 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.Comparator;
@@ -53,16 +57,15 @@ public class SummarizationITCase extends MultipleProgramsTestBase {
}
@Test
- public void testWithVertexAndEdgeValues() throws Exception {
+ public void testWithVertexAndEdgeStringValues() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, String, String> input = Graph.fromDataSet(
SummarizationData.getVertices(env),
SummarizationData.getEdges(env),
- env
- );
+ env);
- List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices = Lists.newArrayList();
- List<Edge<Long, EdgeValue<String>>> summarizedEdges = Lists.newArrayList();
+ List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices = new ArrayList<>();
+ List<Edge<Long, EdgeValue<String>>> summarizedEdges = new ArrayList<>();
Graph<Long, Summarization.VertexValue<String>, EdgeValue<String>> output =
input.run(new Summarization<Long, String, String>());
@@ -77,16 +80,16 @@ public class SummarizationITCase extends MultipleProgramsTestBase {
}
@Test
- public void testWithVertexAndAbsentEdgeValues() throws Exception {
+ public void testWithVertexAndAbsentEdgeStringValues() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, String, NullValue> input = Graph.fromDataSet(
SummarizationData.getVertices(env),
- SummarizationData.getEdgesWithAbsentValues(env),
- env
- );
+ SummarizationData.getEdges(env),
+ env)
+ .run(new TranslateEdgeValues<Long, String, String, NullValue>(new ToNullValue<String>()));
- List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices = Lists.newArrayList();
- List<Edge<Long, EdgeValue<NullValue>>> summarizedEdges = Lists.newArrayList();
+ List<Vertex<Long, Summarization.VertexValue<String>>> summarizedVertices = new ArrayList<>();
+ List<Edge<Long, EdgeValue<NullValue>>> summarizedEdges = new ArrayList<>();
Graph<Long, Summarization.VertexValue<String>, EdgeValue<NullValue>> output =
input.run(new Summarization<Long, String, NullValue>());
@@ -100,23 +103,41 @@ public class SummarizationITCase extends MultipleProgramsTestBase {
validateEdges(SummarizationData.EXPECTED_EDGES_ABSENT_VALUES, summarizedEdges);
}
- private void validateVertices(String[] expectedVertices,
- List<Vertex<Long, Summarization.VertexValue<String>>> actualVertices) {
+ @Test
+ public void testWithVertexAndEdgeLongValues() throws Exception {
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, Long, Long> input = Graph.fromDataSet(
+ SummarizationData.getVertices(env),
+ SummarizationData.getEdges(env),
+ env)
+ .run(new TranslateVertexValues<Long, String, Long, String>(new StringToLong()))
+ .run(new TranslateEdgeValues<Long, Long, String, Long>(new StringToLong()));
+
+ List<Vertex<Long, Summarization.VertexValue<Long>>> summarizedVertices = new ArrayList<>();
+ List<Edge<Long, EdgeValue<Long>>> summarizedEdges = new ArrayList<>();
+
+ Graph<Long, Summarization.VertexValue<Long>, EdgeValue<Long>> output =
+ input.run(new Summarization<Long, Long, Long>());
+
+ output.getVertices().output(new LocalCollectionOutputFormat<>(summarizedVertices));
+ output.getEdges().output(new LocalCollectionOutputFormat<>(summarizedEdges));
+
+ env.execute();
+
+ validateVertices(SummarizationData.EXPECTED_VERTICES, summarizedVertices);
+ validateEdges(SummarizationData.EXPECTED_EDGES_WITH_VALUES, summarizedEdges);
+ }
+
+ private <VV extends Comparable<VV>> void validateVertices(String[] expectedVertices, List<Vertex<Long, Summarization.VertexValue<VV>>> actualVertices) {
Arrays.sort(expectedVertices);
- Collections.sort(actualVertices, new Comparator<Vertex<Long, Summarization.VertexValue<String>>>() {
+ Collections.sort(actualVertices, new Comparator<Vertex<Long, Summarization.VertexValue<VV>>>() {
@Override
- public int compare(Vertex<Long, Summarization.VertexValue<String>> o1,
- Vertex<Long, Summarization.VertexValue<String>> o2) {
+ public int compare(Vertex<Long, Summarization.VertexValue<VV>> o1, Vertex<Long, Summarization.VertexValue<VV>> o2) {
int result = o1.getId().compareTo(o2.getId());
if (result == 0) {
result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
}
- if (result == 0) {
- result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
- }
- if (result == 0) {
- result = o1.getValue().getVertexGroupValue().compareTo(o2.getValue().getVertexGroupValue());
- }
return result;
}
});
@@ -126,8 +147,7 @@ public class SummarizationITCase extends MultipleProgramsTestBase {
}
}
- private <EV extends Comparable<EV>> void validateEdges(String[] expectedEdges,
- List<Edge<Long, EdgeValue<EV>>> actualEdges) {
+ private <EV extends Comparable<EV>> void validateEdges(String[] expectedEdges, List<Edge<Long, EdgeValue<EV>>> actualEdges) {
Arrays.sort(expectedEdges);
Collections.sort(actualEdges, new Comparator<Edge<Long, EdgeValue<EV>>> () {
@@ -138,14 +158,8 @@ public class SummarizationITCase extends MultipleProgramsTestBase {
result = o1.getTarget().compareTo(o2.getTarget());
}
if (result == 0) {
- result = o1.getTarget().compareTo(o2.getTarget());
- }
- if (result == 0) {
result = o1.getValue().getEdgeGroupValue().compareTo(o2.getValue().getEdgeGroupValue());
}
- if (result == 0) {
- result = o1.getValue().getEdgeGroupCount().compareTo(o2.getValue().getEdgeGroupCount());
- }
return result;
}
});
@@ -155,10 +169,10 @@ public class SummarizationITCase extends MultipleProgramsTestBase {
}
}
- private void validateVertex(String expected, Vertex<Long, Summarization.VertexValue<String>> actual) {
+ private <VV> void validateVertex(String expected, Vertex<Long, Summarization.VertexValue<VV>> actual) {
String[] tokens = TOKEN_SEPARATOR.split(expected);
assertTrue(getListFromIdRange(tokens[0]).contains(actual.getId()));
- assertEquals(getGroupValue(tokens[1]), actual.getValue().getVertexGroupValue());
+ assertEquals(getGroupValue(tokens[1]), actual.getValue().getVertexGroupValue().toString());
assertEquals(getGroupCount(tokens[1]), actual.getValue().getVertexGroupCount());
}
@@ -171,7 +185,7 @@ public class SummarizationITCase extends MultipleProgramsTestBase {
}
private List<Long> getListFromIdRange(String idRange) {
- List<Long> result = Lists.newArrayList();
+ List<Long> result = new ArrayList<>();
for (String id : ID_SEPARATOR.split(idRange)) {
result.add(Long.parseLong(id));
}
@@ -185,4 +199,12 @@ public class SummarizationITCase extends MultipleProgramsTestBase {
private Long getGroupCount(String token) {
return Long.valueOf(ID_SEPARATOR.split(token)[1]);
}
+
+ private static class StringToLong implements TranslateFunction<String, Long> {
+
+ @Override
+ public Long translate(String value, Long reuse) throws Exception {
+ return Long.parseLong(value);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/a79efdc2/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
index 0dcdc1f..2361247 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/Summarization.java
@@ -22,16 +22,23 @@ import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.GroupReduceFunction;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.RichGroupReduceFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.functions.FunctionAnnotation;
import org.apache.flink.api.java.operators.GroupReduceOperator;
import org.apache.flink.api.java.operators.UnsortedGrouping;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.Either;
+import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector;
/**
@@ -90,8 +97,18 @@ public class Summarization<K, VV, EV>
@Override
public Graph<K, VertexValue<VV>, EdgeValue<EV>> run(Graph<K, VV, EV> input) throws Exception {
+ // create type infos for correct return type definitions
+ // Note: this use of type hints is only required due to
+ // limitations of the type parser for the Either type
+ // which are being fixed in FLINK-4673
+ TypeInformation<K> keyType = ((TupleTypeInfo<?>) input.getVertices().getType()).getTypeAt(0);
+ TypeInformation<VV> valueType = ((TupleTypeInfo<?>) input.getVertices().getType()).getTypeAt(1);
+ @SuppressWarnings("unchecked")
+ TupleTypeInfo<Vertex<K, VertexValue<VV>>> vertexType = (TupleTypeInfo<Vertex<K, VertexValue<VV>>>) new TupleTypeInfo(
+ Vertex.class, keyType, new TupleTypeInfo(VertexValue.class, valueType, BasicTypeInfo.LONG_TYPE_INFO));
+
// -------------------------
- // build summarized vertices
+ // build super vertices
// -------------------------
// group vertices by value
@@ -100,19 +117,20 @@ public class Summarization<K, VV, EV>
// reduce vertex group and create vertex group items
GroupReduceOperator<Vertex<K, VV>, VertexGroupItem<K, VV>> vertexGroupItems = vertexUnsortedGrouping
.reduceGroup(new VertexGroupReducer<K, VV>());
- // create summarized vertices
+ // create super vertices
DataSet<Vertex<K, VertexValue<VV>>> summarizedVertices = vertexGroupItems
.filter(new VertexGroupItemToSummarizedVertexFilter<K, VV>())
- .map(new VertexGroupItemToSummarizedVertexMapper<K, VV>());
- // create mapping between vertices and their representative
- DataSet<VertexWithRepresentative<K>> vertexToRepresentativeMap = vertexGroupItems
- .filter(new VertexGroupItemToRepresentativeFilter<K, VV>())
- .map(new VertexGroupItemToVertexWithRepresentativeMapper<K, VV>());
+ .map(new VertexGroupItemToSummarizedVertexMapper<K, VV>())
+ .returns(vertexType);
// -------------------------
- // build summarized edges
+ // build super edges
// -------------------------
+ // create mapping between vertices and their representative
+ DataSet<VertexWithRepresentative<K>> vertexToRepresentativeMap = vertexGroupItems
+ .filter(new VertexGroupItemToRepresentativeFilter<K, VV>())
+ .map(new VertexGroupItemToVertexWithRepresentativeMapper<K, VV>());
// join edges with vertex representatives and update source and target identifiers
DataSet<Edge<K, EV>> edgesForGrouping = input.getEdges()
.join(vertexToRepresentativeMap)
@@ -123,7 +141,7 @@ public class Summarization<K, VV, EV>
.where(1) // target vertex id
.equalTo(0) // vertex id
.with(new TargetVertexJoinFunction<K, EV>());
- // create summarized edges
+ // create super edges
DataSet<Edge<K, EdgeValue<EV>>> summarizedEdges = edgesForGrouping
.groupBy(0, 1, 2) // group by source id (0), target id (1) and edge value (2)
.reduceGroup(new EdgeGroupReducer<K, EV>());
@@ -203,10 +221,12 @@ public class Summarization<K, VV, EV>
* @param <VGV> vertex group value type
*/
@SuppressWarnings("serial")
- public static final class VertexGroupItem<K, VGV> extends Tuple4<K, K, VGV, Long> {
+ public static final class VertexGroupItem<K, VGV> extends Tuple4<K, K, Either<VGV, NullValue>, Long> {
+
+ private final Either.Right<VGV, NullValue> nullValue = new Either.Right<>(NullValue.getInstance());
public VertexGroupItem() {
- setVertexGroupCount(0L);
+ reset();
}
public K getVertexId() {
@@ -226,11 +246,15 @@ public class Summarization<K, VV, EV>
}
public VGV getVertexGroupValue() {
- return f2;
+ return f2.isLeft() ? f2.left() : null;
}
public void setVertexGroupValue(VGV vertexGroupValue) {
- f2 = vertexGroupValue;
+ if (vertexGroupValue == null) {
+ f2 = nullValue;
+ } else {
+ f2 = new Either.Left<>(vertexGroupValue);
+ }
}
public Long getVertexGroupCount() {
@@ -247,7 +271,7 @@ public class Summarization<K, VV, EV>
public void reset() {
f0 = null;
f1 = null;
- f2 = null;
+ f2 = nullValue;
f3 = 0L;
}
}
@@ -290,11 +314,13 @@ public class Summarization<K, VV, EV>
*/
@SuppressWarnings("serial")
private static final class VertexGroupReducer<K, VV>
- implements GroupReduceFunction<Vertex<K, VV>, VertexGroupItem<K, VV>> {
+ extends RichGroupReduceFunction<Vertex<K, VV>, VertexGroupItem<K, VV>> {
- private final VertexGroupItem<K, VV> reuseVertexGroupItem;
+ private transient VertexGroupItem<K, VV> reuseVertexGroupItem;
- private VertexGroupReducer() {
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ super.open(parameters);
this.reuseVertexGroupItem = new VertexGroupItem<>();
}
@@ -329,12 +355,10 @@ public class Summarization<K, VV, EV>
* group.
*
* @param vertexGroupRepresentativeId group representative vertex identifier
- * @param vertexGroupValue group property value
- * @param vertexGroupCount total group count
+ * @param vertexGroupValue group property value
+ * @param vertexGroupCount total group count
*/
- private void createGroupRepresentativeTuple(K vertexGroupRepresentativeId,
- VV vertexGroupValue,
- Long vertexGroupCount) {
+ private void createGroupRepresentativeTuple(K vertexGroupRepresentativeId, VV vertexGroupValue, Long vertexGroupCount) {
reuseVertexGroupItem.setVertexId(vertexGroupRepresentativeId);
reuseVertexGroupItem.setVertexGroupValue(vertexGroupValue);
reuseVertexGroupItem.setVertexGroupCount(vertexGroupCount);
@@ -511,9 +535,9 @@ public class Summarization<K, VV, EV>
@FunctionAnnotation.ForwardedFieldsSecond("f1") // vertex group id -> edge target id
private static final class TargetVertexJoinFunction<K, EV>
implements JoinFunction<Edge<K, EV>, VertexWithRepresentative<K>, Edge<K, EV>> {
+
@Override
- public Edge<K, EV> join(Edge<K, EV> edge,
- VertexWithRepresentative<K> vertexRepresentative) throws Exception {
+ public Edge<K, EV> join(Edge<K, EV> edge, VertexWithRepresentative<K> vertexRepresentative) throws Exception {
edge.setTarget(vertexRepresentative.getGroupRepresentativeId());
return edge;
}
[2/2] flink git commit: [FLINK-4643] [gelly] Average Clustering
Coefficient
Posted by gr...@apache.org.
[FLINK-4643] [gelly] Average Clustering Coefficient
Directed and undirected analytics computing the average clustering
coefficient over vertices in a graph and an updated driver.
This closes #2528
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/95c08eab
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/95c08eab
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/95c08eab
Branch: refs/heads/master
Commit: 95c08eab36bfa09c501a84f5b5f116d666d03ae1
Parents: a79efdc
Author: Greg Hogan <co...@greghogan.com>
Authored: Tue Sep 20 12:00:04 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Mon Oct 3 10:37:02 2016 -0400
----------------------------------------------------------------------
.../graph/examples/ClusteringCoefficient.java | 32 ++-
.../directed/AverageClusteringCoefficient.java | 212 +++++++++++++++++++
.../AverageClusteringCoefficient.java | 212 +++++++++++++++++++
.../org/apache/flink/graph/asm/AsmTestBase.java | 4 +-
.../AverageClusteringCoefficientTest.java | 82 +++++++
.../AverageClusteringCoefficientTest.java | 82 +++++++
6 files changed, 622 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
index 7835531..615d765 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/ClusteringCoefficient.java
@@ -102,6 +102,7 @@ public class ClusteringCoefficient {
// global and local clustering coefficient results
GraphAnalytic gcc;
+ GraphAnalytic acc;
DataSet lcc;
switch (parameters.get("input", "")) {
@@ -127,6 +128,9 @@ public class ClusteringCoefficient {
gcc = graph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
+ acc = graph
+ .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = graph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
@@ -134,6 +138,9 @@ public class ClusteringCoefficient {
gcc = graph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
+ acc = graph
+ .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = graph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
@@ -148,6 +155,9 @@ public class ClusteringCoefficient {
gcc = graph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
+ acc = graph
+ .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = graph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
@@ -155,6 +165,9 @@ public class ClusteringCoefficient {
gcc = graph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<StringValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
+ acc = graph
+ .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<StringValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = graph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<StringValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
@@ -189,6 +202,9 @@ public class ClusteringCoefficient {
gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
+ acc = newGraph
+ .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false)
@@ -203,6 +219,9 @@ public class ClusteringCoefficient {
gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
+ acc = newGraph
+ .run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false)
@@ -219,6 +238,9 @@ public class ClusteringCoefficient {
gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
+ acc = newGraph
+ .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false)
@@ -233,6 +255,9 @@ public class ClusteringCoefficient {
gcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<IntValue, NullValue, NullValue>()
.setLittleParallelism(little_parallelism));
+ acc = newGraph
+ .run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+ .setLittleParallelism(little_parallelism));
lcc = newGraph
.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<IntValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false)
@@ -262,11 +287,13 @@ public class ClusteringCoefficient {
}
}
System.out.println(gcc.getResult());
+ System.out.println(acc.getResult());
break;
case "hash":
System.out.println(DataSetUtils.checksumHashCode(lcc));
System.out.println(gcc.getResult());
+ System.out.println(acc.getResult());
break;
case "csv":
@@ -280,7 +307,10 @@ public class ClusteringCoefficient {
lcc.writeAsCsv(filename, lineDelimiter, fieldDelimiter);
- System.out.println(gcc.execute());
+ env.execute("Clustering Coefficient");
+
+ System.out.println(gcc.getResult());
+ System.out.println(acc.getResult());
break;
default:
http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
new file mode 100644
index 0000000..f589d04
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.directed;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient.Result;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.AbstractID;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * The average clustering coefficient measures the mean connectedness of a
+ * graph. Scores range from 0.0 (no triangles) to 1.0 (complete graph).
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class AverageClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+ private String id = new AbstractID().toString();
+
+ // Optional configuration
+ private int littleParallelism = PARALLELISM_DEFAULT;
+
+ /**
+ * Override the parallelism of operators processing small amounts of data.
+ *
+ * @param littleParallelism operator parallelism
+ * @return this
+ */
+ public AverageClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
+ this.littleParallelism = littleParallelism;
+
+ return this;
+ }
+
+ /*
+ * Implementation notes:
+ *
+ * The requirement that "K extends CopyableValue<K>" can be removed when
+ * removed from LocalClusteringCoefficient.
+ */
+
+ @Override
+ public AverageClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input)
+ throws Exception {
+ super.run(input);
+
+ DataSet<LocalClusteringCoefficient.Result<K>> localClusteringCoefficient = input
+ .run(new LocalClusteringCoefficient<K, VV, EV>()
+ .setLittleParallelism(littleParallelism));
+
+ localClusteringCoefficient
+ .output(new AverageClusteringCoefficientHelper<K>(id))
+ .name("Average clustering coefficient");
+
+ return this;
+ }
+
+ @Override
+ public Result getResult() {
+ JobExecutionResult res = env.getLastJobExecutionResult();
+
+ long vertexCount = res.getAccumulatorResult(id + "-0");
+ double sumOfLocalClusteringCoefficient = res.getAccumulatorResult(id + "-1");
+
+ return new Result(vertexCount, sumOfLocalClusteringCoefficient);
+ }
+
+ /**
+ * Helper class to collect the average clustering coefficient.
+ *
+ * @param <T> ID type
+ */
+ private static class AverageClusteringCoefficientHelper<T>
+ extends RichOutputFormat<LocalClusteringCoefficient.Result<T>> {
+ private final String id;
+
+ private long vertexCount;
+ private double sumOfLocalClusteringCoefficient;
+
+ /**
+ * The unique id is required because Flink's accumulator namespace is
+ * shared among all operators.
+ *
+ * @param id unique string used for accumulator names
+ */
+ public AverageClusteringCoefficientHelper(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {}
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {}
+
+ @Override
+ public void writeRecord(LocalClusteringCoefficient.Result<T> record) throws IOException {
+ vertexCount++;
+
+ // local clustering coefficient is only defined on vertices with
+ // at least two neighbors yielding at least one pair of neighbors
+ if (record.getDegree().getValue() > 1) {
+ sumOfLocalClusteringCoefficient += record.getLocalClusteringCoefficientScore();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount));
+ getRuntimeContext().addAccumulator(id + "-1", new DoubleCounter(sumOfLocalClusteringCoefficient));
+ }
+ }
+
+ /**
+ * Wraps global clustering coefficient metrics.
+ */
+ public static class Result {
+ private long vertexCount;
+
+ private double averageLocalClusteringCoefficient;
+
+ /**
+ * Instantiate an immutable result.
+ *
+ * @param vertexCount vertex count
+ * @param sumOfLocalClusteringCoefficient sum over the vertices' local
+ * clustering coefficients
+ */
+ public Result(long vertexCount, double sumOfLocalClusteringCoefficient) {
+ this.vertexCount = vertexCount;
+ this.averageLocalClusteringCoefficient = sumOfLocalClusteringCoefficient / vertexCount;
+ }
+
+ /**
+ * Get the number of vertices.
+ *
+ * @return number of vertices
+ */
+ public long getNumberOfVertices() {
+ return vertexCount;
+ }
+
+ /**
+ * Get the average clustering coefficient.
+ *
+ * @return number of triangles
+ */
+ public double getAverageClusteringCoefficient() {
+ return averageLocalClusteringCoefficient;
+ }
+
+ @Override
+ public String toString() {
+ return "vertex count: " + vertexCount
+ + ", average clustering coefficient: " + averageLocalClusteringCoefficient;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(vertexCount)
+ .append(averageLocalClusteringCoefficient)
+ .hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) { return false; }
+ if (obj == this) { return true; }
+ if (obj.getClass() != getClass()) { return false; }
+
+ Result rhs = (Result)obj;
+
+ return new EqualsBuilder()
+ .append(vertexCount, rhs.vertexCount)
+ .append(averageLocalClusteringCoefficient, rhs.averageLocalClusteringCoefficient)
+ .isEquals();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
new file mode 100644
index 0000000..03dbc71
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.accumulators.DoubleCounter;
+import org.apache.flink.api.common.accumulators.LongCounter;
+import org.apache.flink.api.common.io.RichOutputFormat;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.AbstractGraphAnalytic;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient.Result;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.AbstractID;
+
+import java.io.IOException;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * The average clustering coefficient measures the mean connectedness of a
+ * graph. Scores range from 0.0 (no triangles) to 1.0 (complete graph).
+ *
+ * @param <K> graph ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class AverageClusteringCoefficient<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+extends AbstractGraphAnalytic<K, VV, EV, Result> {
+
+ private String id = new AbstractID().toString();
+
+ // Optional configuration
+ private int littleParallelism = PARALLELISM_DEFAULT;
+
+ /**
+ * Override the parallelism of operators processing small amounts of data.
+ *
+ * @param littleParallelism operator parallelism
+ * @return this
+ */
+ public AverageClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
+ this.littleParallelism = littleParallelism;
+
+ return this;
+ }
+
+ /*
+ * Implementation notes:
+ *
+ * The requirement that "K extends CopyableValue<K>" can be removed when
+ * removed from LocalClusteringCoefficient.
+ */
+
+ @Override
+ public AverageClusteringCoefficient<K, VV, EV> run(Graph<K, VV, EV> input)
+ throws Exception {
+ super.run(input);
+
+ DataSet<LocalClusteringCoefficient.Result<K>> localClusteringCoefficient = input
+ .run(new LocalClusteringCoefficient<K, VV, EV>()
+ .setLittleParallelism(littleParallelism));
+
+ localClusteringCoefficient
+ .output(new AverageClusteringCoefficientHelper<K>(id))
+ .name("Average clustering coefficient");
+
+ return this;
+ }
+
+ @Override
+ public Result getResult() {
+ JobExecutionResult res = env.getLastJobExecutionResult();
+
+ long vertexCount = res.getAccumulatorResult(id + "-0");
+ double sumOfLocalClusteringCoefficient = res.getAccumulatorResult(id + "-1");
+
+ return new Result(vertexCount, sumOfLocalClusteringCoefficient);
+ }
+
+ /**
+ * Helper class to collect the average clustering coefficient.
+ *
+ * @param <T> ID type
+ */
+ private static class AverageClusteringCoefficientHelper<T>
+ extends RichOutputFormat<LocalClusteringCoefficient.Result<T>> {
+ private final String id;
+
+ private long vertexCount;
+ private double sumOfLocalClusteringCoefficient;
+
+ /**
+ * The unique id is required because Flink's accumulator namespace is
+ * shared among all operators.
+ *
+ * @param id unique string used for accumulator names
+ */
+ public AverageClusteringCoefficientHelper(String id) {
+ this.id = id;
+ }
+
+ @Override
+ public void configure(Configuration parameters) {}
+
+ @Override
+ public void open(int taskNumber, int numTasks) throws IOException {}
+
+ @Override
+ public void writeRecord(LocalClusteringCoefficient.Result<T> record) throws IOException {
+ vertexCount++;
+
+ // local clustering coefficient is only defined on vertices with
+ // at least two neighbors yielding at least one pair of neighbors
+ if (record.getDegree().getValue() > 1) {
+ sumOfLocalClusteringCoefficient += record.getLocalClusteringCoefficientScore();
+ }
+ }
+
+ @Override
+ public void close() throws IOException {
+ getRuntimeContext().addAccumulator(id + "-0", new LongCounter(vertexCount));
+ getRuntimeContext().addAccumulator(id + "-1", new DoubleCounter(sumOfLocalClusteringCoefficient));
+ }
+ }
+
+ /**
+ * Wraps global clustering coefficient metrics.
+ */
+ public static class Result {
+ private long vertexCount;
+
+ private double averageLocalClusteringCoefficient;
+
+ /**
+ * Instantiate an immutable result.
+ *
+ * @param vertexCount vertex count
+ * @param sumOfLocalClusteringCoefficient sum over the vertices' local
+ * clustering coefficients
+ */
+ public Result(long vertexCount, double sumOfLocalClusteringCoefficient) {
+ this.vertexCount = vertexCount;
+ this.averageLocalClusteringCoefficient = sumOfLocalClusteringCoefficient / vertexCount;
+ }
+
+ /**
+ * Get the number of vertices.
+ *
+ * @return number of vertices
+ */
+ public long getNumberOfVertices() {
+ return vertexCount;
+ }
+
+ /**
+ * Get the average clustering coefficient.
+ *
+ * @return number of triangles
+ */
+ public double getAverageClusteringCoefficient() {
+ return averageLocalClusteringCoefficient;
+ }
+
+ @Override
+ public String toString() {
+ return "vertex count: " + vertexCount
+ + ", average clustering coefficient: " + averageLocalClusteringCoefficient;
+ }
+
+ @Override
+ public int hashCode() {
+ return new HashCodeBuilder()
+ .append(vertexCount)
+ .append(averageLocalClusteringCoefficient)
+ .hashCode();
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) { return false; }
+ if (obj == this) { return true; }
+ if (obj.getClass() != getClass()) { return false; }
+
+ Result rhs = (Result)obj;
+
+ return new EqualsBuilder()
+ .append(vertexCount, rhs.vertexCount)
+ .append(averageLocalClusteringCoefficient, rhs.averageLocalClusteringCoefficient)
+ .isEquals();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index 28e2669..8ef87a5 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -48,6 +48,8 @@ public class AsmTestBase {
protected Graph<LongValue,NullValue,NullValue> completeGraph;
// empty graph
+ protected final long emptyGraphVertexCount = 3;
+
protected Graph<LongValue,NullValue,NullValue> emptyGraph;
// RMat graph
@@ -86,7 +88,7 @@ public class AsmTestBase {
.generate();
// empty graph
- emptyGraph = new EmptyGraph(env, 3)
+ emptyGraph = new EmptyGraph(env, emptyGraphVertexCount)
.generate();
// RMat graph
http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficientTest.java
new file mode 100644
index 0000000..9de9bac
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficientTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.directed;
+
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class AverageClusteringCoefficientTest
+extends AsmTestBase {
+
+ @Test
+ public void testWithSimpleGraph()
+ throws Exception {
+ // see results in LocalClusteringCoefficientTest.testSimpleGraph
+ Result expectedResult = new Result(6, 1.0/2 + 2.0/6 + 2.0/6 + 1.0/12);
+
+ Result averageClusteringCoefficient = new AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+ .run(directedSimpleGraph)
+ .execute();
+
+ assertEquals(expectedResult, averageClusteringCoefficient);
+ }
+
+ @Test
+ public void testWithCompleteGraph()
+ throws Exception {
+ Result expectedResult = new Result(completeGraphVertexCount, completeGraphVertexCount);
+
+ Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .run(completeGraph)
+ .execute();
+
+ assertEquals(expectedResult, averageClusteringCoefficient);
+ }
+
+ @Test
+ public void testWithEmptyGraph()
+ throws Exception {
+ Result expectedResult = new Result(emptyGraphVertexCount, 0);
+
+ Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .run(emptyGraph)
+ .execute();
+
+ assertEquals(expectedResult, averageClusteringCoefficient);
+ }
+
+ @Test
+ public void testWithRMatGraph()
+ throws Exception {
+ Result expectedResult = new Result(902, 297.152607);
+
+ Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .run(directedRMatGraph)
+ .execute();
+
+ assertEquals(expectedResult.getNumberOfVertices(), averageClusteringCoefficient.getNumberOfVertices());
+ assertEquals(expectedResult.getAverageClusteringCoefficient(), averageClusteringCoefficient.getAverageClusteringCoefficient(), 0.000001);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/95c08eab/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficientTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficientTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficientTest.java
new file mode 100644
index 0000000..34fda17
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficientTest.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library.clustering.undirected;
+
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient.Result;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class AverageClusteringCoefficientTest
+extends AsmTestBase {
+
+ @Test
+ public void testWithSimpleGraph()
+ throws Exception {
+ // see results in LocalClusteringCoefficientTest.testSimpleGraph
+ Result expectedResult = new Result(6, 1.0/1 + 2.0/3 + 2.0/3 + 1.0/6);
+
+ Result averageClusteringCoefficient = new AverageClusteringCoefficient<IntValue, NullValue, NullValue>()
+ .run(undirectedSimpleGraph)
+ .execute();
+
+ assertEquals(expectedResult, averageClusteringCoefficient);
+ }
+
+ @Test
+ public void testWithCompleteGraph()
+ throws Exception {
+ Result expectedResult = new Result(completeGraphVertexCount, completeGraphVertexCount);
+
+ Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .run(completeGraph)
+ .execute();
+
+ assertEquals(expectedResult, averageClusteringCoefficient);
+ }
+
+ @Test
+ public void testWithEmptyGraph()
+ throws Exception {
+ Result expectedResult = new Result(emptyGraphVertexCount, 0);
+
+ Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .run(emptyGraph)
+ .execute();
+
+ assertEquals(expectedResult, averageClusteringCoefficient);
+ }
+
+ @Test
+ public void testWithRMatGraph()
+ throws Exception {
+ Result expectedResult = new Result(902, 380.40109);
+
+ Result averageClusteringCoefficient = new AverageClusteringCoefficient<LongValue, NullValue, NullValue>()
+ .run(undirectedRMatGraph)
+ .execute();
+
+ assertEquals(expectedResult.getNumberOfVertices(), averageClusteringCoefficient.getNumberOfVertices());
+ assertEquals(expectedResult.getAverageClusteringCoefficient(), averageClusteringCoefficient.getAverageClusteringCoefficient(), 0.000001);
+ }
+}