You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/18 11:27:20 UTC

[1/2] flink git commit: [FLINK-2714] [gelly] Copy triangle counting logic from EnumTrianglesOpt.java to Gelly library. Also reorganizing classes to use Gelly's Graph APIs.

Repository: flink
Updated Branches:
  refs/heads/master 580768c30 -> d047ddb49


[FLINK-2714] [gelly] Copy triangle counting logic from EnumTrianglesOpt.java to Gelly library.
Also reorganizing classes to use Gelly's Graph APIs.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b443cb68
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b443cb68
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b443cb68

Branch: refs/heads/master
Commit: b443cb6835ea39d59c05084f5228fc859a0fd4f5
Parents: 580768c
Author: Saumitra Shahapure <sa...@gmail.com>
Authored: Fri Oct 9 23:51:45 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sun Oct 18 10:51:58 2015 +0200

----------------------------------------------------------------------
 .../graph/example/utils/TriangleCountData.java  |   9 +
 .../flink/graph/library/TriangleEnumerator.java | 345 +++++++++++++++++++
 .../test/library/TriangleEnumeratorITCase.java  |  58 ++++
 3 files changed, 412 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b443cb68/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
index 5b2cc3d..c8cea12 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.example.utils;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.types.NullValue;
 
@@ -52,5 +53,13 @@ public class TriangleCountData {
 
 	public static final String RESULTED_NUMBER_OF_TRIANGLES = "3";
 
+	public static List<Tuple3<Long,Long,Long>> getListOfTriangles()	{
+		ArrayList<Tuple3<Long,Long,Long>> ret = new ArrayList<>(3);
+		ret.add(new Tuple3<>(1L,2L,3L));
+		ret.add(new Tuple3<>(2L,3L,6L));
+		ret.add(new Tuple3<>(3L,4L,5L));
+		return ret;
+	}
+
 	private TriangleCountData () {}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b443cb68/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
new file mode 100644
index 0000000..ad2ec77
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *	 http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+
+
+/**
+ * This function returns Dataset of all triangles present in the input graph.
+ * A triangle consists of three edges that connect three vertices with each other.  Edge directions are ignored here.
+ * <p>
+ * <p>
+ * The basic algorithm works as follows:
+ * It groups all edges that share a common vertex and builds triads, i.e., triples of vertices
+ * that are connected by two edges. Finally, all triads are filtered for which no third edge exists
+ * that closes the triangle.
+ * <p>
+ * <p>
+ * For a group of <i>n</i> edges that share a common vertex, the number of built triads is quadratic <i>((n*(n-1))/2)</i>.
+ * Therefore, an optimization of the algorithm is to group edges on the vertex with the smaller output degree to
+ * reduce the number of triads.
+ * This implementation extends the basic algorithm by computing output degrees of edge vertices and
+ * grouping on edges on the vertex with the smaller degree.
+ */
+
+public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K,K,K>>> {
+	@Override
+	public DataSet<Tuple3<K,K,K>> run(Graph<K, VV, EV> input) throws Exception {
+
+		DataSet<Edge<K, EV>> edges = input.getEdges();
+
+		// annotate edges with degrees
+		DataSet<EdgeWithDegrees<K>> edgesWithDegrees = edges.flatMap(new EdgeDuplicator<K, EV>())
+				.groupBy(0).sortGroup(1, Order.ASCENDING).reduceGroup(new DegreeCounter<K, EV>())
+				.groupBy(EdgeWithDegrees.V1, EdgeWithDegrees.V2).reduce(new DegreeJoiner<K>());
+
+		// project edges by degrees
+		DataSet<Edge<K, NullValue>> edgesByDegree = edgesWithDegrees.map(new EdgeByDegreeProjector<K>());
+		// project edges by vertex id
+		DataSet<Edge<K, NullValue>> edgesById = edgesByDegree.map(new EdgeByIdProjector<K>());
+
+		DataSet<Tuple3<K,K,K>> triangles = edgesByDegree
+				// build triads
+				.groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
+				.reduceGroup(new TriadBuilder())
+				// filter triads
+				.join(edgesById).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<K>());
+
+		return triangles;
+	}
+
+	/**
+	 * Emits for an edge the original edge and its switched version.
+	 */
+	private static final class EdgeDuplicator<K, EV> implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> {
+
+		@Override
+		public void flatMap(Edge<K, EV> edge, Collector<Edge<K, EV>> out) throws Exception {
+			out.collect(edge);
+			Edge<K, EV> reversed = edge.reverse();
+			out.collect(reversed);
+		}
+	}
+
+	/**
+	 * Counts the number of edges that share a common vertex.
+	 * Emits one edge for each input edge with a degree annotation for the shared vertex.
+	 * For each emitted edge, the first vertex is the vertex with the smaller id.
+	 */
+	private static final class DegreeCounter<K extends Comparable<K>, EV>
+			implements GroupReduceFunction<Edge<K, EV>, EdgeWithDegrees<K>> {
+
+		final ArrayList<K> otherVertices = new ArrayList<K>();
+		final EdgeWithDegrees<K> outputEdge = new EdgeWithDegrees<>();
+
+		@Override
+		public void reduce(Iterable<Edge<K, EV>> edgesIter, Collector<EdgeWithDegrees<K>> out) {
+
+			Iterator<Edge<K, EV>> edges = edgesIter.iterator();
+			otherVertices.clear();
+
+			// get first edge
+			Edge<K, EV> edge = edges.next();
+			K groupVertex = edge.getSource();
+			this.otherVertices.add(edge.getTarget());
+
+			// get all other edges (assumes edges are sorted by second vertex)
+			while (edges.hasNext()) {
+				edge = edges.next();
+				K otherVertex = edge.getTarget();
+				// collect unique vertices
+				if (!otherVertices.contains(otherVertex) && otherVertex != groupVertex) {
+					this.otherVertices.add(otherVertex);
+				}
+			}
+			int degree = this.otherVertices.size();
+
+			// emit edges
+			for (K otherVertex : this.otherVertices) {
+				if (groupVertex.compareTo(otherVertex) < 0) {
+					outputEdge.setFirstVertex(groupVertex);
+					outputEdge.setFirstDegree(degree);
+					outputEdge.setSecondVertex(otherVertex);
+					outputEdge.setSecondDegree(0);
+				} else {
+					outputEdge.setFirstVertex(otherVertex);
+					outputEdge.setFirstDegree(0);
+					outputEdge.setSecondVertex(groupVertex);
+					outputEdge.setSecondDegree(degree);
+				}
+				out.collect(outputEdge);
+			}
+		}
+	}
+
+	/**
+	 * Builds an edge with degree annotation from two edges that have the same vertices and only one
+	 * degree annotation.
+	 */
+	@FunctionAnnotation.ForwardedFields("0;1")
+	private static final class DegreeJoiner<K> implements ReduceFunction<EdgeWithDegrees<K>> {
+		private final EdgeWithDegrees<K> outEdge = new EdgeWithDegrees<>();
+
+		@Override
+		public EdgeWithDegrees<K> reduce(EdgeWithDegrees<K> edge1, EdgeWithDegrees<K> edge2) throws Exception {
+
+			// copy first edge
+		/*\t*/outEdge.copyFrom(edge1);
+
+			// set missing degree
+			if (edge1.getFirstDegree() == 0 && edge1.getSecondDegree() != 0) {
+				outEdge.setFirstDegree(edge2.getFirstDegree());
+			} else if (edge1.getFirstDegree() != 0 && edge1.getSecondDegree() == 0) {
+				outEdge.setSecondDegree(edge2.getSecondDegree());
+			}
+			return outEdge;
+		}
+	}
+
+	/**
+	 * Projects an edge (pair of vertices) such that the first vertex is the vertex with the smaller degree.
+	 */
+	private static final class EdgeByDegreeProjector<K> implements MapFunction<EdgeWithDegrees<K>, Edge<K, NullValue>> {
+
+		private final Edge<K, NullValue> outEdge = new Edge<>();
+
+		@Override
+		public Edge<K, NullValue> map(EdgeWithDegrees<K> inEdge) throws Exception {
+
+			// copy vertices to simple edge
+			outEdge.setSource(inEdge.getFirstVertex());
+			outEdge.setTarget(inEdge.getSecondVertex());
+			outEdge.setValue(NullValue.getInstance());
+
+			// flip vertices if first degree is larger than second degree.
+			if (inEdge.getFirstDegree() > inEdge.getSecondDegree()) {
+				outEdge.reverse();
+			}
+
+			// return edge
+			return outEdge;
+		}
+	}
+
+	/**
+	 * Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second.
+	 */
+	private static final class EdgeByIdProjector<K extends Comparable<K>>
+			implements MapFunction<Edge<K, NullValue>, Edge<K, NullValue>> {
+
+		@Override
+		public Edge<K, NullValue> map(Edge<K, NullValue> inEdge) throws Exception {
+
+			// flip vertices if necessary
+			if (inEdge.getSource().compareTo(inEdge.getTarget()) < 0) {
+				inEdge.reverse();
+			}
+
+			return inEdge;
+		}
+	}
+
+	/**
+	 * Builds triads (triples of vertices) from pairs of edges that share a vertex.
+	 * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId.
+	 * Assumes that input edges share the first vertex and are in ascending order of the second vertex.
+	 */
+	@FunctionAnnotation.ForwardedFields("0")
+	private static final class TriadBuilder<K extends Comparable<K>>
+			implements GroupReduceFunction<Edge<K, NullValue>, Triad<K>> {
+
+		private final List<K> vertices = new ArrayList<>();
+		private final Triad<K> outTriad = new Triad<>();
+
+		@Override
+		public void reduce(Iterable<Edge<K, NullValue>> edgesIter, Collector<Triad<K>> out) throws Exception {
+			final Iterator<Edge<K, NullValue>> edges = edgesIter.iterator();
+
+			// clear vertex list
+			vertices.clear();
+
+			// read first edge
+			Edge<K, NullValue> firstEdge = edges.next();
+			outTriad.setFirstVertex(firstEdge.getSource());
+			vertices.add(firstEdge.getTarget());
+
+			// build and emit triads
+			while (edges.hasNext()) {
+				K higherVertexId = edges.next().getTarget();
+
+				// combine vertex with all previously read vertices
+				for (K lowerVertexId : vertices) {
+					outTriad.setSecondVertex(lowerVertexId);
+					outTriad.setThirdVertex(higherVertexId);
+					out.collect(outTriad);
+				}
+				vertices.add(higherVertexId);
+			}
+		}
+	}
+
+	/**
+	 * Filters triads (three vertices connected by two edges) without a closing third edge.
+	 */
+	private static final class TriadFilter<K> implements JoinFunction<Triad<K>, Edge<K,NullValue>, Tuple3<K,K,K>> {
+
+		@Override
+		public Tuple3<K,K,K> join(Triad<K> triad, Edge<K, NullValue> edge) throws Exception {
+			return new Tuple3<>(triad.getFirstVertex(), triad.getSecondVertex(), triad.getThirdVertex());
+		}
+	}
+
+	public static final class EdgeWithDegrees<K> extends Tuple4<K, K, Integer, Integer> {
+
+		public static final int V1 = 0;
+		public static final int V2 = 1;
+		public static final int D1 = 2;
+		public static final int D2 = 3;
+
+		public K getFirstVertex() {
+			return this.getField(V1);
+		}
+
+		public K getSecondVertex() {
+			return this.getField(V2);
+		}
+
+		public Integer getFirstDegree() {
+			return this.getField(D1);
+		}
+
+		public Integer getSecondDegree() {
+			return this.getField(D2);
+		}
+
+		public void setFirstVertex(final K vertex1) {
+			this.setField(vertex1, V1);
+		}
+
+		public void setSecondVertex(final K vertex2) {
+			this.setField(vertex2, V2);
+		}
+
+		public void setFirstDegree(final Integer degree1) {
+			this.setField(degree1, D1);
+		}
+
+		public void setSecondDegree(final Integer degree2) {
+			this.setField(degree2, D2);
+		}
+
+		public void copyFrom(final EdgeWithDegrees<K> edge) {
+			this.setFirstVertex(edge.getFirstVertex());
+			this.setSecondVertex(edge.getSecondVertex());
+			this.setFirstDegree(edge.getFirstDegree());
+			this.setSecondDegree(edge.getSecondDegree());
+		}
+	}
+
+	public static final class Triad<K> extends Tuple3<K, K, K> {
+		private static final long serialVersionUID = 1L;
+
+		public static final int V1 = 0;
+		public static final int V2 = 1;
+		public static final int V3 = 2;
+
+		public K getFirstVertex() {
+			return this.getField(V1);
+		}
+
+		public K getSecondVertex() {
+			return this.getField(V2);
+		}
+
+		public K getThirdVertex() {
+			return this.getField(V3);
+		}
+
+		public void setFirstVertex(final K vertex1) {
+			this.setField(vertex1, V1);
+		}
+
+		public void setSecondVertex(final K vertex2) {
+			this.setField(vertex2, V2);
+		}
+
+		public void setThirdVertex(final K vertex3) {
+			this.setField(vertex3, V3);
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/b443cb68/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
new file mode 100644
index 0000000..9f0156f
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.example.utils.TriangleCountData;
+import org.apache.flink.graph.library.TriangleEnumerator;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class TriangleEnumeratorITCase extends MultipleProgramsTestBase {
+
+	public TriangleEnumeratorITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testTriangleEnumerator() throws Exception	{
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
+				env).getUndirected();
+
+		List<Tuple3<Long,Long,Long>> actualOutput = graph.run(new TriangleEnumerator<Long, NullValue, NullValue>()).collect();
+		List<Tuple3<Long,Long,Long>>  expectedResult = TriangleCountData.getListOfTriangles();
+
+		Assert.assertEquals(actualOutput.size(), expectedResult.size());
+		for(Tuple3<Long,Long,Long> resultTriangle:actualOutput)	{
+			Assert.assertTrue(expectedResult.indexOf(resultTriangle)>=0);
+		}
+	}
+}


[2/2] flink git commit: [FLINK-2714] [gelly] add the algorithm description in the gelly docs; update test to get the directed graph as input

Posted by va...@apache.org.
[FLINK-2714] [gelly] add the algorithm description in the gelly docs;
update test to get the directed graph as input

This closes #1250


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d047ddb4
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d047ddb4
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d047ddb4

Branch: refs/heads/master
Commit: d047ddb494460b3573514d4035341a247fbcb9c0
Parents: b443cb6
Author: vasia <va...@apache.org>
Authored: Sun Oct 18 10:48:01 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sun Oct 18 10:54:59 2015 +0200

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        | 47 +++++++++++---------
 .../flink/graph/library/TriangleEnumerator.java | 19 ++++++--
 .../test/library/TriangleEnumeratorITCase.java  |  2 +-
 3 files changed, 43 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d047ddb4/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 2c6ce56..13d304d 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -1403,15 +1403,16 @@ Library Methods
 -----------
 Gelly has a growing collection of graph algorithms for easily analyzing large-scale Graphs. So far, the following library methods are implemented:
 
-* PageRank
-* Single-Source Shortest Paths
-* Label Propagation
-* Simple Community Detection
-* Connected Components
-* GSA PageRank
-* GSA Connected Components
-* GSA Single-Source Shortest Paths
-* GSA Triangle Count
+* [Community Detection](#community-detection)
+* [Label Propagation](#label-propagation)
+* [Connected Components](#connected-components)
+* [GSA Connected Components](#gsa-connected-components)
+* [PageRank](#pagerank)
+* [GSA PageRank](#gsa-pagerank)
+* [Single Source Shortest Paths](#single-source-shortest-paths)
+* [GSA Single Source Shortest Paths](#gsa-single-source-shortest-paths)
+* [GSA Triangle Count](#gsa-triangle-count)
+* [Triangle Enumerator](#triangle-enumerator)
 
 Gelly's library methods can be used by simply calling the `run()` method on the input graph:
 
@@ -1423,13 +1424,11 @@ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 Graph<Long, Long, NullValue> graph = ...
 
 // run Label Propagation for 30 iterations to detect communities on the input graph
-DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run(
-				new LabelPropagation<Long>(30)).getVertices();
+DataSet<Vertex<Long, Long>> verticesWithCommunity = graph.run(new LabelPropagation<Long>(30));
 
 // print the result
 verticesWithCommunity.print();
 
-env.execute();
 {% endhighlight %}
 </div>
 
@@ -1445,7 +1444,6 @@ val verticesWithCommunity = graph.run(new LabelPropagation[Long](30))
 // print the result
 verticesWithCommunity.print
 
-env.execute
 {% endhighlight %}
 </div>
 </div>
@@ -1473,7 +1471,6 @@ The constructor takes two parameters:
 * `maxIterations`: the maximum number of iterations to run.
 * `delta`: the hop attenuation parameter, with default value 0.5.
 
-
 ### Label Propagation
 
 #### Overview
@@ -1512,14 +1509,12 @@ The constructor takes one parameter:
 
 * `maxIterations`: the maximum number of iterations to run.
 
-
 ### GSA Connected Components
 
 The algorithm is implemented using [gather-sum-apply iterations](#gather-sum-apply-iterations).
 
 See the [Connected Components](#connected-components) library method for implementation details and usage information.
 
-
 ### PageRank
 
 #### Overview
@@ -1538,14 +1533,12 @@ The constructors take the following parameters:
 * `maxIterations`: the maximum number of iterations to run.
 * `numVertices`: the number of vertices in the input. If known beforehand, is it advised to provide this argument to speed up execution.
 
-
 ### GSA PageRank
 
 The algorithm is implemented using [gather-sum-apply iterations](#gather-sum-apply-iterations).
 
 See the [PageRank](#pagerank) library method for implementation details and usage information.
 
-
 ### Single Source Shortest Paths
 
 #### Overview
@@ -1563,14 +1556,12 @@ The constructor takes two parameters:
 * `srcVertexId` The vertex ID of the source vertex.
 * `maxIterations`: the maximum number of iterations to run.
 
-
 ### GSA Single Source Shortest Paths
 
 The algorithm is implemented using [gather-sum-apply iterations](#gather-sum-apply-iterations).
 
 See the [Single Source Shortest Paths](#single-source-shortest-paths) library method for implementation details and usage information.
 
-
 ### GSA Triangle Count
 
 #### Overview
@@ -1585,5 +1576,21 @@ Finally, if a node encounters the target ID in the list of received messages, it
 The algorithm takes an undirected, unweighted graph as input and outputs a `DataSet` which contains a single integer corresponding to the number of triangles
 in the graph. The algorithm constructor takes no arguments.
 
+### Triangle Enumerator
+
+#### Overview
+This library method enumerates unique triangles present in the input graph. A triangle consists of three edges that connect three vertices with each other.
+This implementation ignores edge directions.
+
+#### Details
+The basic triangle enumeration algorithm groups all edges that share a common vertex and builds triads, i.e., triples of vertices
+that are connected by two edges. Then, all triads are filtered for which no third edge exists that closes the triangle.
+For a group of <i>n</i> edges that share a common vertex, the number of built triads is quadratic <i>((n*(n-1))/2)</i>.
+Therefore, an optimization of the algorithm is to group edges on the vertex with the smaller output degree to reduce the number of triads.
+This implementation extends the basic algorithm by computing output degrees of edge vertices and grouping on edges on the vertex with the smaller degree.
+
+#### Usage
+The algorithm takes a directed graph as input and outputs a `DataSet` of `Tuple3`. The Vertex ID type has to be `Comparable`.
+Each `Tuple3` corresponds to a triangle, with the fields containing the IDs of the vertices forming the triangle.
 
 [Back to top](#top)

http://git-wip-us.apache.org/repos/asf/flink/blob/d047ddb4/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
index ad2ec77..c6bba4c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/TriangleEnumerator.java
@@ -40,8 +40,10 @@ import java.util.List;
 
 
 /**
- * This function returns Dataset of all triangles present in the input graph.
- * A triangle consists of three edges that connect three vertices with each other.  Edge directions are ignored here.
+ * This library method enumerates unique triangles present in the input graph.
+ * A triangle consists of three edges that connect three vertices with each other.
+ * Edge directions are ignored here.
+ * The method returns a DataSet of Tuple3, where the fields of each Tuple3 contain the Vertex IDs of a triangle.
  * <p>
  * <p>
  * The basic algorithm works as follows:
@@ -56,8 +58,9 @@ import java.util.List;
  * This implementation extends the basic algorithm by computing output degrees of edge vertices and
  * grouping on edges on the vertex with the smaller degree.
  */
+public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements
+	GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K,K,K>>> {
 
-public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K,K,K>>> {
 	@Override
 	public DataSet<Tuple3<K,K,K>> run(Graph<K, VV, EV> input) throws Exception {
 
@@ -76,7 +79,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
 		DataSet<Tuple3<K,K,K>> triangles = edgesByDegree
 				// build triads
 				.groupBy(EdgeWithDegrees.V1).sortGroup(EdgeWithDegrees.V2, Order.ASCENDING)
-				.reduceGroup(new TriadBuilder())
+				.reduceGroup(new TriadBuilder<K>())
 				// filter triads
 				.join(edgesById).where(Triad.V2, Triad.V3).equalTo(0, 1).with(new TriadFilter<K>());
 
@@ -86,6 +89,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
 	/**
 	 * Emits for an edge the original edge and its switched version.
 	 */
+	@SuppressWarnings("serial")
 	private static final class EdgeDuplicator<K, EV> implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> {
 
 		@Override
@@ -101,6 +105,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
 	 * Emits one edge for each input edge with a degree annotation for the shared vertex.
 	 * For each emitted edge, the first vertex is the vertex with the smaller id.
 	 */
+	@SuppressWarnings("serial")
 	private static final class DegreeCounter<K extends Comparable<K>, EV>
 			implements GroupReduceFunction<Edge<K, EV>, EdgeWithDegrees<K>> {
 
@@ -151,6 +156,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
 	 * Builds an edge with degree annotation from two edges that have the same vertices and only one
 	 * degree annotation.
 	 */
+	@SuppressWarnings("serial")
 	@FunctionAnnotation.ForwardedFields("0;1")
 	private static final class DegreeJoiner<K> implements ReduceFunction<EdgeWithDegrees<K>> {
 		private final EdgeWithDegrees<K> outEdge = new EdgeWithDegrees<>();
@@ -174,6 +180,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
 	/**
 	 * Projects an edge (pair of vertices) such that the first vertex is the vertex with the smaller degree.
 	 */
+	@SuppressWarnings("serial")
 	private static final class EdgeByDegreeProjector<K> implements MapFunction<EdgeWithDegrees<K>, Edge<K, NullValue>> {
 
 		private final Edge<K, NullValue> outEdge = new Edge<>();
@@ -199,6 +206,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
 	/**
 	 * Projects an edge (pair of vertices) such that the id of the first is smaller than the id of the second.
 	 */
+	@SuppressWarnings("serial")
 	private static final class EdgeByIdProjector<K extends Comparable<K>>
 			implements MapFunction<Edge<K, NullValue>, Edge<K, NullValue>> {
 
@@ -219,6 +227,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
 	 * The first vertex of a triad is the shared vertex, the second and third vertex are ordered by vertexId.
 	 * Assumes that input edges share the first vertex and are in ascending order of the second vertex.
 	 */
+	@SuppressWarnings("serial")
 	@FunctionAnnotation.ForwardedFields("0")
 	private static final class TriadBuilder<K extends Comparable<K>>
 			implements GroupReduceFunction<Edge<K, NullValue>, Triad<K>> {
@@ -256,6 +265,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
 	/**
 	 * Filters triads (three vertices connected by two edges) without a closing third edge.
 	 */
+	@SuppressWarnings("serial")
 	private static final class TriadFilter<K> implements JoinFunction<Triad<K>, Edge<K,NullValue>, Tuple3<K,K,K>> {
 
 		@Override
@@ -264,6 +274,7 @@ public class TriangleEnumerator<K extends Comparable<K>, VV, EV> implements Grap
 		}
 	}
 
+	@SuppressWarnings("serial")
 	public static final class EdgeWithDegrees<K> extends Tuple4<K, K, Integer, Integer> {
 
 		public static final int V1 = 0;

http://git-wip-us.apache.org/repos/asf/flink/blob/d047ddb4/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
index 9f0156f..d06ba30 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleEnumeratorITCase.java
@@ -45,7 +45,7 @@ public class TriangleEnumeratorITCase extends MultipleProgramsTestBase {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
-				env).getUndirected();
+				env);
 
 		List<Tuple3<Long,Long,Long>> actualOutput = graph.run(new TriangleEnumerator<Long, NullValue, NullValue>()).collect();
 		List<Tuple3<Long,Long,Long>>  expectedResult = TriangleCountData.getListOfTriangles();