You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/18 11:27:20 UTC
[1/2] flink git commit: [FLINK-2714] [gelly] Copy triangle counting
logic from EnumTrianglesOpt.java to Gelly library. Also reorganizing classes
to use Gelly's Graph APIs.
Repository: flink
Updated Branches:
refs/heads/master 580768c30 -> d047ddb49
[FLINK-2714] [gelly] Copy triangle counting logic from EnumTrianglesOpt.java to Gelly library.
Also reorganizing classes to use Gelly's Graph APIs.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b443cb68
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b443cb68
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b443cb68
Branch: refs/heads/master
Commit: b443cb6835ea39d59c05084f5228fc859a0fd4f5
Parents: 580768c
Author: Saumitra Shahapure <sa...@gmail.com>
Authored: Fri Oct 9 23:51:45 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sun Oct 18 10:51:58 2015 +0200
----------------------------------------------------------------------
.../graph/example/utils/TriangleCountData.java | 9 +
.../flink/graph/library/TriangleEnumerator.java | 345 +++++++++++++++++++
.../test/library/TriangleEnumeratorITCase.java | 58 ++++
3 files changed, 412 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/b443cb68/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
index 5b2cc3d..c8cea12 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.example.utils;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.types.NullValue;
@@ -52,5 +53,13 @@ public class TriangleCountData {
public static final String RESULTED_NUMBER_OF_TRIANGLES = "3";
+ public static List<Tuple3<Long,Long,Long>> getListOfTriangles() {
+ ArrayList<Tuple3<Long,Long,Long>> ret = new ArrayList<>(3);
+ ret.add(new Tuple3<>(1L,2L,3L));
+ ret.add(new Tuple3<>(2L,3L,6L));
+ ret.add(new Tuple3<>(3L,4L,5L));
+ return ret;
+ }
+
private TriangleCountData () {}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/b443cb68/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
new file mode 100644
index 0000000..ad2ec77
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+
+/**
+ * This function returns Dataset of all triangles present in the input graph.
+ * A triangle consists of three edges that connect three vertices with each other. Edge directions are ignored here.
+ * <p>
+ * <p>
+ * The basic algorithm works as follows:
+ * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices
+ * that are connected by two edges. Finally, all triads are filtered for which no third edge exists
+ * that closes the triangle.
+ * <p>
+ * <p>
+ * For a group of <i>n</i> edges that share a common vertex, the number of built triads is quadratic <i>((n*(n-1))/2)</i>.
+ * Therefore, an optimization of the algorithm is to group edges on the vertex with the smaller output degree to
+ * reduce the number of triads.
+ * This implementation extends the basic algorithm by computing output degrees of edge vertices and
+ * grouping on edges on the vertex with the smaller degree.
+ */
+
+public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K,K,K>>> {
+ @Override
+ public DataSet<Tuple3<K,K,K>> run(Graph<K, VV, EV> input) throws Exception {
+
+ DataSet<Edge<K, EV>> edges = input.getEdges();
+
+ // annotate edges with degrees
+ DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<K, EV>())
+ .groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<K, EV>())
+ .groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<K>());
+
+ // project edges by degrees
+ DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<K>());
+ // project edges by vertex id
+ DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<K>());
+
+ DataSet<Tuple3<K,K,K>> triangles = edgesByDegree
+ // build triads
+ .groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
+ .reduceGroup(new TriadBuilder())
+ // filter triads
+ .join(edgesById).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<K>());
+
+ return triangles;
+ }
+
+ /**
+ * Emits for an edge the original edge and its switched version.
+ */
+ private static final class EdgeDuplicator<K, EV> implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> {
+
+ @Override
+ public void flatMap(Edge<K, EV> edge, Collector<Edge<K, EV>> out) throws Exception {
+ out.collect(edge);
+ Edge<K, EV> reversed = edge.reverse();
+ out.collect(reversed);
+ }
+ }
+
+ /**
+ * Counts the number of edges that share a common vertex.
+ * Emits one edge for each input edge with a degree annotation for the shared vertex.
+ * For each emitted edge, the first vertex is the vertex with the smaller id.
+ */
+ private static final class DegreeCounter<K extends Comparable<K>, EV>
+ implements GroupReduceFunction<Edge<K, EV>, EdgeWithDegrees<K>> {
+
+ final ArrayList<K> otherVertices = new ArrayList<K>();
+ final EdgeWithDegrees<K> outputEdge = new EdgeWithDegrees<>();
+
+ @Override
+ public void reduce(Iterable<Edge<K, EV>> edgesIter, Collector<EdgeWithDegrees<K>> out) {
+
+ Iterator<Edge<K, EV>> edges = edgesIter.iterator();
+ otherVertices.clear();
+
+ // get first edge
+ Edge<K, EV> edge = edges.next();
+ K groupVertex = edge.getSource();
+ this.otherVertices.add(edge.getTarget());
+
+ // get all other edges (assumes edges are sorted by second vertex)
+ while (edges.hasNext()) {
+ edge = edges.next();
+ K otherVertex = edge.getTarget();
+ // collect unique vertices
+ if (!otherVertices.contains(otherVertex) && otherVertex != groupVertex) {
+ this.otherVertices.add(otherVertex);
+ }
+ }
+ int degree = this.otherVertices.size();
+
+ // emit edges
+ for (K otherVertex : this.otherVertices) {
+ if (groupVertex.compareTo(otherVertex) < 0) {
+ outputEdge.setFirstVertex(groupVertex);
+ outputEdge.setFirstDegree(degree);
+ outputEdge.setSecondVertex(otherVertex);
+ outputEdge.setSecondDegree(0);
+ } else {
+ outputEdge.setFirstVertex(otherVertex);
+ outputEdge.setFirstDegree(0);
+ outputEdge.setSecondVertex(groupVertex);
+ outputEdge.setSecondDegree(degree);
+ }
+ out.collect(outputEdge);
+ }
+ }
+ }
+
+ /**
+ * Builds an edge with degree annotation from two edges that have the same vertices and only one
+ * degree annotation.
+ */
+ @FunctionAnnotation.ForwardedFields("0;1")
+ private static final class DegreeJoiner<K> implements ReduceFunction<EdgeWithDegrees<K>> {
+ private final EdgeWithDegrees<K> outEdge = new EdgeWithDegrees<>();
+
+ @Override
+ public EdgeWithDegrees<K> reduce(EdgeWithDegrees<K> edge1, EdgeWithDegrees<K> edge2) throws Exception {
+
+ // copy first edge
+ /*\t*/outEdge.copyFrom(edge1);
+
+ // set missing degree
+ if (edge1.getFirstDegree() == 0 && edge1.getSecondDegree() != 0) {
+ outEdge.setFirstDegree(edge2.getFirstDegree());
+ } else if (edge1.getFirstDegree() != 0 && edge1.getSecondDegree() == 0) {
+ outEdge.setSecondDegree(edge2.getSecondDegree());
+ }
+ return outEdge;
+ }
+ }
+
+ /**
+ * Projects an edge (pair of vertices) such that the first vertex is the vertex with the smaller degree.
+ */
+ private static final class EdgeByDegreeProjector<K> implements MapFunction<EdgeWithDegrees<K>, Edge<K, NullValue>> {
+
+ private final Edge<K, NullValue> outEdge = new Edge<>();
+
+ @Override
+ public Edge<K, NullValue> map(EdgeWithDegrees<K> inEdge) throws Exception {
+
+ // copy vertices to simple edge
+ outEdge.setSource(inEdge.getFirstVertex());
+ outEdge.setTarget(inEdge.getSecondVertex());
+ outEdge.setValue(NullValue.getInstance());
+
+ // flip vertices if first degree is larger than second degree.
+ if (inEdge.getFirstDegree() > inEdge.getSecondDegree()) {
+ outEdge.reverse();
+ }
+
+ // return edge
+ return outEdge;
+ }
+ }
+
+ /**
+ * Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second.
+ */
+ private static final class EdgeByIdProjector<K extends Comparable<K>>
+ implements MapFunction<Edge<K, NullValue>, Edge<K, NullValue>> {
+
+ @Override
+ public Edge<K, NullValue> map(Edge<K, NullValue> inEdge) throws Exception {
+
+ // flip vertices if necessary
+ if (inEdge.getSource().compareTo(inEdge.getTarget()) < 0) {
+ inEdge.reverse();
+ }
+
+ return inEdge;
+ }
+ }
+
+ /**
+ * Builds triads (triples of vertices) from pairs of edges that share a vertex.
+ * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId.
+ * Assumes that input edges share the first vertex and are in ascending order of the second vertex.
+ */
+ @FunctionAnnotation.ForwardedFields("0")
+ private static final class TriadBuilder<K extends Comparable<K>>
+ implements GroupReduceFunction<Edge<K, NullValue>, Triad<K>> {
+
+ private final List<K> vertices = new ArrayList<>();
+ private final Triad<K> outTriad = new Triad<>();
+
+ @Override
+ public void reduce(Iterable<Edge<K, NullValue>> edgesIter, Collector<Triad<K>> out) throws Exception {
+ final Iterator<Edge<K, NullValue>> edges = edgesIter.iterator();
+
+ // clear vertex list
+ vertices.clear();
+
+ // read first edge
+ Edge<K, NullValue> firstEdge = edges.next();
+ outTriad.setFirstVertex(firstEdge.getSource());
+ vertices.add(firstEdge.getTarget());
+
+ // build and emit triads
+ while (edges.hasNext()) {
+ K higherVertexId = edges.next().getTarget();
+
+ // combine vertex with all previously read vertices
+ for (K lowerVertexId : vertices) {
+ outTriad.setSecondVertex(lowerVertexId);
+ outTriad.setThirdVertex(higherVertexId);
+ out.collect(outTriad);
+ }
+ vertices.add(higherVertexId);
+ }
+ }
+ }
+
+ /**
+ * Filters triads (three vertices connected by two edges) without a closing third edge.
+ */
+ private static final class TriadFilter<K> implements JoinFunction<Triad<K>, Edge<K,NullValue>, Tuple3<K,K,K>> {
+
+ @Override
+ public Tuple3<K,K,K> join(Triad<K> triad, Edge<K, NullValue> edge) throws Exception {
+ return new Tuple3<>(triad.getFirstVertex(), triad.getSecondVertex(), triad.getThirdVertex());
+ }
+ }
+
+ public static final class EdgeWithDegrees<K> extends Tuple4<K, K, Integer, Integer> {
+
+ public static final int V1 = 0;
+ public static final int V2 = 1;
+ public static final int D1 = 2;
+ public static final int D2 = 3;
+
+ public K getFirstVertex() {
+ return this.getField(V1);
+ }
+
+ public K getSecondVertex() {
+ return this.getField(V2);
+ }
+
+ public Integer getFirstDegree() {
+ return this.getField(D1);
+ }
+
+ public Integer getSecondDegree() {
+ return this.getField(D2);
+ }
+
+ public void setFirstVertex(final K vertex1) {
+ this.setField(vertex1, V1);
+ }
+
+ public void setSecondVertex(final K vertex2) {
+ this.setField(vertex2, V2);
+ }
+
+ public void setFirstDegree(final Integer degree1) {
+ this.setField(degree1, D1);
+ }
+
+ public void setSecondDegree(final Integer degree2) {
+ this.setField(degree2, D2);
+ }
+
+ public void copyFrom(final EdgeWithDegrees<K> edge) {
+ this.setFirstVertex(edge.getFirstVertex());
+ this.setSecondVertex(edge.getSecondVertex());
+ this.setFirstDegree(edge.getFirstDegree());
+ this.setSecondDegree(edge.getSecondDegree());
+ }
+ }
+
+ public static final class Triad<K> extends Tuple3<K, K, K> {
+ private static final long serialVersionUID = 1L;
+
+ public static final int V1 = 0;
+ public static final int V2 = 1;
+ public static final int V3 = 2;
+
+ public K getFirstVertex() {
+ return this.getField(V1);
+ }
+
+ public K getSecondVertex() {
+ return this.getField(V2);
+ }
+
+ public K getThirdVertex() {
+ return this.getField(V3);
+ }
+
+ public void setFirstVertex(final K vertex1) {
+ this.setField(vertex1, V1);
+ }
+
+ public void setSecondVertex(final K vertex2) {
+ this.setField(vertex2, V2);
+ }
+
+ public void setThirdVertex(final K vertex3) {
+ this.setField(vertex3, V3);
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/b443cb68/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
new file mode 100644
index 0000000..9f0156f
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.example.utils.TriangleCountData;
+import org.apache.flink.graph.library.TriangleEnumerator;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class TriangleEnumeratorITCase extends MultipleProgramsTestBase {
+
+ public TriangleEnumeratorITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testTriangleEnumerator() throws Exception {
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
+ env).getUndirected();
+
+ List<Tuple3<Long,Long,Long>> actualOutput = graph.run(new TriangleEnumerator<Long, NullValue, NullValue>()).collect();
+ List<Tuple3<Long,Long,Long>> expectedResult = TriangleCountData.getListOfTriangles();
+
+ Assert.assertEquals(actualOutput.size(), expectedResult.size());
+ for(Tuple3<Long,Long,Long> resultTriangle:actualOutput) {
+ Assert.assertTrue(expectedResult.indexOf(resultTriangle)>=0);
+ }
+ }
+}
[2/2] flink git commit: [FLINK-2714] [gelly] add the algorithm
description in the gelly docs; update test to get the directed graph as input
Posted by va...@apache.org.
[FLINK-2714] [gelly] add the algorithm description in the gelly docs;
update test to get the directed graph as input
This closes #1250
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d047ddb4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d047ddb4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d047ddb4
Branch: refs/heads/master
Commit: d047ddb494460b3573514d4035341a247fbcb9c0
Parents: b443cb6
Author: vasia <va...@apache.org>
Authored: Sun Oct 18 10:48:01 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sun Oct 18 10:54:59 2015 +0200
----------------------------------------------------------------------
docs/libs/gelly_guide.md | 47 +++++++++++---------
.../flink/graph/library/TriangleEnumerator.java | 19 ++++++--
.../test/library/TriangleEnumeratorITCase.java | 2 +-
3 files changed, 43 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/d047ddb4/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 2c6ce56..13d304d 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -1403,15 +1403,16 @@ Library Methods
-----------
Gelly has a growing collection of graph algorithms for easily analyzing large-scale Graphs. So far, the following library methods are implemented:
-* PageRank
-* Single-Source Shortest Paths
-* Label Propagation
-* Simple Community Detection
-* Connected Components
-* GSA PageRank
-* GSA Connected Components
-* GSA Single-Source Shortest Paths
-* GSA Triangle Count
+* [Community Detection](#community-detection)
+* [Label Propagation](#label-propagation)
+* [Connected Components](#connected-components)
+* [GSA Connected Components](#gsa-connected-components)
+* [PageRank](#pagerank)
+* [GSA PageRank](#gsa-pagerank)
+* [Single Source Shortest Paths](#single-source-shortest-paths)
+* [GSA Single Source Shortest Paths](#gsa-single-source-shortest-paths)
+* [GSA Triangle Count](#gsa-triangle-count)
+* [Triangle Enumerator](#triangle-enumerator)
Gelly's library methods can be used by simply calling the `run()` method on the input graph:
@@ -1423,13 +1424,11 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, Long, NullValue> graph = ...
// run Label Propagation for 30 iterations to detect communities on the input graph
-DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run(
- new LabelPropagation<Long>(30)).getVertices();
+DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run(new LabelPropagation<Long>(30));
// print the result
verticesWithCommunity.print();
-env.execute();
{% endhighlight %}
</div>
@@ -1445,7 +1444,6 @@ val verticesWithCommunity = graph.run(new LabelPropagation[Long](30))
// print the result
verticesWithCommunity.print
-env.execute
{% endhighlight %}
</div>
</div>
@@ -1473,7 +1471,6 @@ The constructor takes two parameters:
* `maxIterations`: the maximum number of iterations to run.
* `delta`: the hop attenuation parameter, with default value 0.5.
-
### Label Propagation
#### Overview
@@ -1512,14 +1509,12 @@ The constructor takes one parameter:
* `maxIterations`: the maximum number of iterations to run.
-
### GSA Connected Components
The algorithm is implemented using [gather-sum-apply iterations](#gather-sum-apply-iterations).
See the [Connected Components](#connected-components) library method for implementation details and usage information.
-
### PageRank
#### Overview
@@ -1538,14 +1533,12 @@ The constructors take the following parameters:
* `maxIterations`: the maximum number of iterations to run.
* `numVertices`: the number of vertices in the input. If known beforehand, is it advised to provide this argument to speed up execution.
-
### GSA PageRank
The algorithm is implemented using [gather-sum-apply iterations](#gather-sum-apply-iterations).
See the [PageRank](#pagerank) library method for implementation details and usage information.
-
### Single Source Shortest Paths
#### Overview
@@ -1563,14 +1556,12 @@ The constructor takes two parameters:
* `srcVertexId` The vertex ID of the source vertex.
* `maxIterations`: the maximum number of iterations to run.
-
### GSA Single Source Shortest Paths
The algorithm is implemented using [gather-sum-apply iterations](#gather-sum-apply-iterations).
See the [Single Source Shortest Paths](#single-source-shortest-paths) library method for implementation details and usage information.
-
### GSA Triangle Count
#### Overview
@@ -1585,5 +1576,21 @@ Finally, if a node encounters the target ID in the list of received messages, it
The algorithm takes an undirected, unweighted graph as input and outputs a `DataSet` which contains a single integer corresponding to the number of triangles
in the graph. The algorithm constructor takes no arguments.
+### Triangle Enumerator
+
+#### Overview
+This library method enumerates unique triangles present in the input graph. A triangle consists of three edges that connect three vertices with each other.
+This implementation ignores edge directions.
+
+#### Details
+The basic triangle enumeration algorithm groups all edges that share a common vertex and builds triads, i.e., triples of vertices
+that are connected by two edges. Then, all triads are filtered for which no third edge exists that closes the triangle.
+For a group of <i>n</i> edges that share a common vertex, the number of built triads is quadratic <i>((n*(n-1))/2)</i>.
+Therefore, an optimization of the algorithm is to group edges on the vertex with the smaller output degree to reduce the number of triads.
+This implementation extends the basic algorithm by computing output degrees of edge vertices and grouping on edges on the vertex with the smaller degree.
+
+#### Usage
+The algorithm takes a directed graph as input and outputs a `DataSet` of `Tuple3`. The Vertex ID type has to be `Comparable`.
+Each `Tuple3` corresponds to a triangle, with the fields containing the IDs of the vertices forming the triangle.
[Back to top](#top)
http://git-wip-us.apache.org/repos/asf/flink/blob/d047ddb4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
index ad2ec77..c6bba4c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -40,8 +40,10 @@ import java.util.List;
/**
- * This function returns Dataset of all triangles present in the input graph.
- * A triangle consists of three edges that connect three vertices with each other. Edge directions are ignored here.
+ * This library method enumerates unique triangles present in the input graph.
+ * A triangle consists of three edges that connect three vertices with each other.
+ * Edge directions are ignored here.
+ * The method returns a DataSet of Tuple3, where the fields of each Tuple3 contain the Vertex IDs of a triangle.
* <p>
* <p>
* The basic algorithm works as follows:
@@ -56,8 +58,9 @@ import java.util.List;
* This implementation extends the basic algorithm by computing output degrees of edge vertices and
* grouping on edges on the vertex with the smaller degree.
*/
+public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
+ GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K,K,K>>> {
-public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K,K,K>>> {
@Override
public DataSet<Tuple3<K,K,K>> run(Graph<K, VV, EV> input) throws Exception {
@@ -76,7 +79,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
DataSet<Tuple3<K,K,K>> triangles = edgesByDegree
// build triads
.groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
- .reduceGroup(new TriadBuilder())
+ .reduceGroup(new TriadBuilder<K>())
// filter triads
.join(edgesById).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<K>());
@@ -86,6 +89,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
/**
* Emits for an edge the original edge and its switched version.
*/
+ @SuppressWarnings("serial")
private static final class EdgeDuplicator<K, EV> implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> {
@Override
@@ -101,6 +105,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
* Emits one edge for each input edge with a degree annotation for the shared vertex.
* For each emitted edge, the first vertex is the vertex with the smaller id.
*/
+ @SuppressWarnings("serial")
private static final class DegreeCounter<K extends Comparable<K>, EV>
implements GroupReduceFunction<Edge<K, EV>, EdgeWithDegrees<K>> {
@@ -151,6 +156,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
* Builds an edge with degree annotation from two edges that have the same vertices and only one
* degree annotation.
*/
+ @SuppressWarnings("serial")
@FunctionAnnotation.ForwardedFields("0;1")
private static final class DegreeJoiner<K> implements ReduceFunction<EdgeWithDegrees<K>> {
private final EdgeWithDegrees<K> outEdge = new EdgeWithDegrees<>();
@@ -174,6 +180,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
/**
* Projects an edge (pair of vertices) such that the first vertex is the vertex with the smaller degree.
*/
+ @SuppressWarnings("serial")
private static final class EdgeByDegreeProjector<K> implements MapFunction<EdgeWithDegrees<K>, Edge<K, NullValue>> {
private final Edge<K, NullValue> outEdge = new Edge<>();
@@ -199,6 +206,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
/**
* Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second.
*/
+ @SuppressWarnings("serial")
private static final class EdgeByIdProjector<K extends Comparable<K>>
implements MapFunction<Edge<K, NullValue>, Edge<K, NullValue>> {
@@ -219,6 +227,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
* The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId.
* Assumes that input edges share the first vertex and are in ascending order of the second vertex.
*/
+ @SuppressWarnings("serial")
@FunctionAnnotation.ForwardedFields("0")
private static final class TriadBuilder<K extends Comparable<K>>
implements GroupReduceFunction<Edge<K, NullValue>, Triad<K>> {
@@ -256,6 +265,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
/**
* Filters triads (three vertices connected by two edges) without a closing third edge.
*/
+ @SuppressWarnings("serial")
private static final class TriadFilter<K> implements JoinFunction<Triad<K>, Edge<K,NullValue>, Tuple3<K,K,K>> {
@Override
@@ -264,6 +274,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
}
}
+ @SuppressWarnings("serial")
public static final class EdgeWithDegrees<K> extends Tuple4<K, K, Integer, Integer> {
public static final int V1 = 0;
http://git-wip-us.apache.org/repos/asf/flink/blob/d047ddb4/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
index 9f0156f..d06ba30 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
@@ -45,7 +45,7 @@ public class TriangleEnumeratorITCase extends MultipleProgramsTestBase {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
- env).getUndirected();
+ env);
List<Tuple3<Long,Long,Long>> actualOutput = graph.run(new TriangleEnumerator<Long, NullValue, NullValue>()).collect();
List<Tuple3<Long,Long,Long>> expectedResult = TriangleCountData.getListOfTriangles();