You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/18 11:27:21 UTC
[2/2] flink git commit: [FLINK-2714] [gelly] add the algorithm
description in the gelly docs; update test to get the directed graph as input
[FLINK-2714] [gelly] add the algorithm description in the gelly docs;
update test to get the directed graph as input
This closes #1250
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d047ddb4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d047ddb4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d047ddb4
Branch: refs/heads/master
Commit: d047ddb494460b3573514d4035341a247fbcb9c0
Parents: b443cb6
Author: vasia <va...@apache.org>
Authored: Sun Oct 18 10:48:01 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sun Oct 18 10:54:59 2015 +0200
----------------------------------------------------------------------
docs/libs/gelly_guide.md | 47 +++++++++++---------
.../flink/graph/library/TriangleEnumerator.java | 19 ++++++--
.../test/library/TriangleEnumeratorITCase.java | 2 +-
3 files changed, 43 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d047ddb4/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 2c6ce56..13d304d 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -1403,15 +1403,16 @@ Library Methods
-----------
Gelly has a growing collection of graph algorithms for easily analyzing large-scale Graphs. So far, the following library methods are implemented:
-* PageRank
-* Single-Source Shortest Paths
-* Label Propagation
-* Simple Community Detection
-* Connected Components
-* GSA PageRank
-* GSA Connected Components
-* GSA Single-Source Shortest Paths
-* GSA Triangle Count
+* [Community Detection](#community-detection)
+* [Label Propagation](#label-propagation)
+* [Connected Components](#connected-components)
+* [GSA Connected Components](#gsa-connected-components)
+* [PageRank](#pagerank)
+* [GSA PageRank](#gsa-pagerank)
+* [Single Source Shortest Paths](#single-source-shortest-paths)
+* [GSA Single Source Shortest Paths](#gsa-single-source-shortest-paths)
+* [GSA Triangle Count](#gsa-triangle-count)
+* [Triangle Enumerator](#triangle-enumerator)
Gelly's library methods can be used by simply calling the `run()` method on the input graph:
@@ -1423,13 +1424,11 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, NullValue> graph = ...
// run Label Propagation for 30 iterations to detect communities on the input graph
-DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run(
- new LabelPropagation<Long>(30)).getVertices();
+DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run(new LabelPropagation<Long>(30));
// print the result
verticesWithCommunity.print();
-env.execute();
{% endhighlight %}
</div>
@@ -1445,7 +1444,6 @@ val verticesWithCommunity = graph.run(new LabelPropagation[Long](30))
// print the result
verticesWithCommunity.print
-env.execute
{% endhighlight %}
</div>
</div>
@@ -1473,7 +1471,6 @@ The constructor takes two parameters:
* `maxIterations`: the maximum number of iterations to run.
* `delta`: the hop attenuation parameter, with default value 0.5.
-
### Label Propagation
#### Overview
@@ -1512,14 +1509,12 @@ The constructor takes one parameter:
* `maxIterations`: the maximum number of iterations to run.
-
### GSA Connected Components
The algorithm is implemented using [gather-sum-apply iterations](#gather-sum-apply-iterations).
See the [Connected Components](#connected-components) library method for implementation details and usage information.
-
### PageRank
#### Overview
@@ -1538,14 +1533,12 @@ The constructors take the following parameters:
* `maxIterations`: the maximum number of iterations to run.
* `numVertices`: the number of vertices in the input. If known beforehand, is it advised to provide this argument to speed up execution.
-
### GSA PageRank
The algorithm is implemented using [gather-sum-apply iterations](#gather-sum-apply-iterations).
See the [PageRank](#pagerank) library method for implementation details and usage information.
-
### Single Source Shortest Paths
#### Overview
@@ -1563,14 +1556,12 @@ The constructor takes two parameters:
* `srcVertexId` The vertex ID of the source vertex.
* `maxIterations`: the maximum number of iterations to run.
-
### GSA Single Source Shortest Paths
The algorithm is implemented using [gather-sum-apply iterations](#gather-sum-apply-iterations).
See the [Single Source Shortest Paths](#single-source-shortest-paths) library method for implementation details and usage information.
-
### GSA Triangle Count
#### Overview
@@ -1585,5 +1576,21 @@ Finally, if a node encounters the target ID in the list of received messages, it
The algorithm takes an undirected, unweighted graph as input and outputs a `DataSet` which contains a single integer corresponding to the number of triangles
in the graph. The algorithm constructor takes no arguments.
+### Triangle Enumerator
+
+#### Overview
+This library method enumerates unique triangles present in the input graph. A triangle consists of three edges that connect three vertices with each other.
+This implementation ignores edge directions.
+
+#### Details
+The basic triangle enumeration algorithm groups all edges that share a common vertex and builds triads, i.e., triples of vertices
+that are connected by two edges. Then, all triads are filtered for which no third edge exists that closes the triangle.
+For a group of <i>n</i> edges that share a common vertex, the number of built triads is quadratic <i>((n*(n-1))/2)</i>.
+Therefore, an optimization of the algorithm is to group edges on the vertex with the smaller output degree to reduce the number of triads.
+This implementation extends the basic algorithm by computing output degrees of edge vertices and grouping on edges on the vertex with the smaller degree.
+
+#### Usage
+The algorithm takes a directed graph as input and outputs a `DataSet` of `Tuple3`. The Vertex ID type has to be `Comparable`.
+Each `Tuple3` corresponds to a triangle, with the fields containing the IDs of the vertices forming the triangle.
[Back to top](#top)
http://git-wip-us.apache.org/repos/asf/flink/blob/d047ddb4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
index ad2ec77..c6bba4c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -40,8 +40,10 @@ import java.util.List;
/**
- * This function returns Dataset of all triangles present in the input graph.
- * A triangle consists of three edges that connect three vertices with each other. Edge directions are ignored here.
+ * This library method enumerates unique triangles present in the input graph.
+ * A triangle consists of three edges that connect three vertices with each other.
+ * Edge directions are ignored here.
+ * The method returns a DataSet of Tuple3, where the fields of each Tuple3 contain the Vertex IDs of a triangle.
* <p>
* <p>
* The basic algorithm works as follows:
@@ -56,8 +58,9 @@ import java.util.List;
* This implementation extends the basic algorithm by computing output degrees of edge vertices and
* grouping on edges on the vertex with the smaller degree.
*/
+public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
+ GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K,K,K>>> {
-public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K,K,K>>> {
@Override
public DataSet<Tuple3<K,K,K>> run(Graph<K, VV, EV> input) throws Exception {
@@ -76,7 +79,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
DataSet<Tuple3<K,K,K>> triangles = edgesByDegree
// build triads
.groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
- .reduceGroup(new TriadBuilder())
+ .reduceGroup(new TriadBuilder<K>())
// filter triads
.join(edgesById).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<K>());
@@ -86,6 +89,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
/**
* Emits for an edge the original edge and its switched version.
*/
+ @SuppressWarnings("serial")
private static final class EdgeDuplicator<K, EV> implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> {
@Override
@@ -101,6 +105,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
* Emits one edge for each input edge with a degree annotation for the shared vertex.
* For each emitted edge, the first vertex is the vertex with the smaller id.
*/
+ @SuppressWarnings("serial")
private static final class DegreeCounter<K extends Comparable<K>, EV>
implements GroupReduceFunction<Edge<K, EV>, EdgeWithDegrees<K>> {
@@ -151,6 +156,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
* Builds an edge with degree annotation from two edges that have the same vertices and only one
* degree annotation.
*/
+ @SuppressWarnings("serial")
@FunctionAnnotation.ForwardedFields("0;1")
private static final class DegreeJoiner<K> implements ReduceFunction<EdgeWithDegrees<K>> {
private final EdgeWithDegrees<K> outEdge = new EdgeWithDegrees<>();
@@ -174,6 +180,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
/**
* Projects an edge (pair of vertices) such that the first vertex is the vertex with the smaller degree.
*/
+ @SuppressWarnings("serial")
private static final class EdgeByDegreeProjector<K> implements MapFunction<EdgeWithDegrees<K>, Edge<K, NullValue>> {
private final Edge<K, NullValue> outEdge = new Edge<>();
@@ -199,6 +206,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
/**
* Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second.
*/
+ @SuppressWarnings("serial")
private static final class EdgeByIdProjector<K extends Comparable<K>>
implements MapFunction<Edge<K, NullValue>, Edge<K, NullValue>> {
@@ -219,6 +227,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
* The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId.
* Assumes that input edges share the first vertex and are in ascending order of the second vertex.
*/
+ @SuppressWarnings("serial")
@FunctionAnnotation.ForwardedFields("0")
private static final class TriadBuilder<K extends Comparable<K>>
implements GroupReduceFunction<Edge<K, NullValue>, Triad<K>> {
@@ -256,6 +265,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
/**
* Filters triads (three vertices connected by two edges) without a closing third edge.
*/
+ @SuppressWarnings("serial")
private static final class TriadFilter<K> implements JoinFunction<Triad<K>, Edge<K,NullValue>, Tuple3<K,K,K>> {
@Override
@@ -264,6 +274,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
}
}
+ @SuppressWarnings("serial")
public static final class EdgeWithDegrees<K> extends Tuple4<K, K, Integer, Integer> {
public static final int V1 = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/d047ddb4/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
index 9f0156f..d06ba30 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
@@ -45,7 +45,7 @@ public class TriangleEnumeratorITCase extends MultipleProgramsTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
- env).getUndirected();
+ env);
List<Tuple3<Long,Long,Long>> actualOutput = graph.run(new TriangleEnumerator<Long, NullValue, NullValue>()).collect();
List<Tuple3<Long,Long,Long>> expectedResult = TriangleCountData.getListOfTriangles();