You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by an...@apache.org on 2015/09/08 11:52:02 UTC

flink git commit: [FLINK-2570] [gelly] Added a Triangle Count Library Method

Repository: flink
Updated Branches:
  refs/heads/master 948b6e057 -> 97fb9a47c


[FLINK-2570] [gelly] Added a Triangle Count Library Method

[FLINK-2570] [gelly] Added a description of the I/O

This closes #1054


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97fb9a47
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97fb9a47
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97fb9a47

Branch: refs/heads/master
Commit: 97fb9a47c6e7efa33b84b10fcc5454189002f6f3
Parents: 948b6e0
Author: Andra Lungu <lu...@gmail.com>
Authored: Tue Aug 25 14:07:18 2015 +0200
Committer: Andra Lungu <lu...@gmail.com>
Committed: Tue Sep 8 11:50:30 2015 +0200

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        |   4 +
 .../example/utils/CommunityDetectionData.java   |   2 +-
 .../example/utils/LabelPropagationData.java     |   4 +
 .../graph/example/utils/MusicProfilesData.java  |   4 +
 .../flink/graph/example/utils/PageRankData.java |   4 +
 .../graph/example/utils/TriangleCountData.java  |  56 ++++++
 .../flink/graph/library/GSATriangleCount.java   | 189 +++++++++++++++++++
 .../graph/test/library/TriangleCountITCase.java |  56 ++++++
 8 files changed, 318 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/97fb9a47/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index e0e05b3..562df31 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -800,6 +800,10 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc
 * Label Propagation
 * Simple Community Detection
 * Connected Components
+* GSA PageRank
+* GSA Connected Components
+* GSA Single-Source Shortest Paths
+* GSA Triangle Count
 
 Gelly's library methods can be used by simply calling the `run()` method on the input graph:
 

http://git-wip-us.apache.org/repos/asf/flink/blob/97fb9a47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
index 196de3a..c37b2b5 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
@@ -26,7 +26,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 /**
- * Provides the default data set used for the Simple Community Detection example program.
+ * Provides the default data set used for the Simple Community Detection test program.
  * If no parameters are given to the program, the default edge data set is used.
  */
 public class CommunityDetectionData {

http://git-wip-us.apache.org/repos/asf/flink/blob/97fb9a47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
index b70a9c4..0a92097 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
@@ -27,6 +27,10 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.NullValue;
 
+/**
+ * Provides the default data set used for the Label Propagation test program.
+ * If no parameters are given to the program, the default edge data set is used.
+ */
 public class LabelPropagationData {
 	
 	public static final String LABELS_AFTER_1_ITERATION = "1,10\n" +

http://git-wip-us.apache.org/repos/asf/flink/blob/97fb9a47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
index 6b96372..3a97d1f 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
@@ -25,6 +25,10 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple3;
 
+/**
+ * Provides the default data sets used for the Music Profiles example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
 public class MusicProfilesData {
 
 	public static DataSet<Tuple3<String, String, Integer>> getUserSongTriplets(ExecutionEnvironment env) {

http://git-wip-us.apache.org/repos/asf/flink/blob/97fb9a47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
index c84808a..58d4f8b 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
@@ -25,6 +25,10 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Edge;
 
+/**
+ * Provides the default data set used for the PageRank test program.
+ * If no parameters are given to the program, the default edge data set is used.
+ */
 public class PageRankData {
 	
 	public static final String EDGES = "2	1\n" +

http://git-wip-us.apache.org/repos/asf/flink/blob/97fb9a47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
new file mode 100644
index 0000000..5b2cc3d
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.types.NullValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the Triangle Count test program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class TriangleCountData {
+
+	public static final String EDGES = "1	2\n"+"1	3\n"+"2	3\n"+"2	6\n"+"3	4\n"+"3	5\n"+"3	6\n"+"4	5\n"+"6	7\n";
+
+	public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
+		edges.add(new Edge<Long, NullValue>(1L, 2L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(1L, 3L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(2L, 6L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(3L, 4L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(3L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(3L, 6L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(4L, 5L, NullValue.getInstance()));
+		edges.add(new Edge<Long, NullValue>(6L, 7L, NullValue.getInstance()));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final String RESULTED_NUMBER_OF_TRIANGLES = "3";
+
+	private TriangleCountData () {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97fb9a47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
new file mode 100644
index 0000000..95c70bf
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.ReduceNeighborsFunction;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.types.NullValue;
+
+import java.util.TreeMap;
+
+/**
+ * Triangle Count Algorithm.
+ *
+ * This algorithm operates in three phases. First, vertices select neighbors with id greater than theirs
+ * and send messages to them. Each received message is then propagated to neighbors with higher id.
+ * Finally, if a node encounters the target id in the list of received messages, it increments the number
+ * of triangles found.
+ *
+ * This implementation is non - iterative.
+ *
+ * The algorithm takes an undirected, unweighted graph as input and outputs a DataSet of
+ * Tuple1 which contains a single integer representing the number of triangles.
+ */
+public class GSATriangleCount implements
+		GraphAlgorithm<Long, NullValue, NullValue, DataSet<Tuple1<Integer>>> {
+
+	@Override
+	public DataSet<Tuple1<Integer>> run(Graph<Long, NullValue, NullValue> input) throws Exception {
+
+		ExecutionEnvironment env = input.getContext();
+
+		// order the edges so that src is always higher than trg
+		DataSet<Edge<Long, NullValue>> edges = input.getEdges()
+				.map(new OrderEdges()).distinct();
+
+		Graph<Long, TreeMap<Long, Integer>, NullValue> graph = Graph.fromDataSet(edges,
+				new VertexInitializer(), env);
+
+		// select neighbors with ids higher than the current vertex id
+		// Gather: a no-op in this case
+		// Sum: create the set of neighbors
+		DataSet<Tuple2<Long, TreeMap<Long, Integer>>> higherIdNeighbors =
+				graph.reduceOnNeighbors(new GatherHigherIdNeighbors(), EdgeDirection.IN);
+
+		Graph<Long, TreeMap<Long, Integer>, NullValue> graphWithReinitializedVertexValues =
+				graph.mapVertices(new VertexInitializerEmptyTreeMap());
+
+		// Apply: attach the computed values to the vertices
+		// joinWithVertices to update the node values
+		DataSet<Vertex<Long, TreeMap<Long, Integer>>> verticesWithHigherIdNeighbors =
+				graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new AttachValues()).getVertices();
+
+		Graph<Long, TreeMap<Long,Integer>, NullValue> graphWithNeighbors = Graph.fromDataSet(verticesWithHigherIdNeighbors,
+				edges, env);
+
+		// propagate each received value to neighbors with higher id
+		// Gather: a no-op in this case
+		// Sum: propagate values
+		DataSet<Tuple2<Long, TreeMap<Long, Integer>>> propagatedValues = graphWithNeighbors
+				.reduceOnNeighbors(new GatherHigherIdNeighbors(), EdgeDirection.IN);
+
+		// Apply: attach propagated values to vertices
+		DataSet<Vertex<Long, TreeMap<Long, Integer>>> verticesWithPropagatedValues =
+				graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new AttachValues()).getVertices();
+
+		Graph<Long, TreeMap<Long, Integer>, NullValue> graphWithPropagatedNeighbors =
+				Graph.fromDataSet(verticesWithPropagatedValues, graphWithNeighbors.getEdges(), env);
+
+		// Scatter: compute the number of triangles
+		DataSet<Tuple1<Integer>> numberOfTriangles = graphWithPropagatedNeighbors.getTriplets()
+				.map(new ComputeTriangles()).reduce(new ReduceFunction<Tuple1<Integer>>() {
+
+					@Override
+					public Tuple1<Integer> reduce(Tuple1<Integer> firstTuple, Tuple1<Integer> secondTuple) throws Exception {
+						return new Tuple1<Integer>(firstTuple.f0 + secondTuple.f0);
+					}
+				});
+
+		return numberOfTriangles;
+	}
+
+	@SuppressWarnings("serial")
+	private static final class OrderEdges implements MapFunction<Edge<Long, NullValue>, Edge<Long, NullValue>> {
+
+		@Override
+		public Edge<Long, NullValue> map(Edge<Long, NullValue> edge) throws Exception {
+			if (edge.getSource() < edge.getTarget()) {
+				return new Edge<Long, NullValue>(edge.getTarget(), edge.getSource(), NullValue.getInstance());
+			} else {
+				return edge;
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class VertexInitializer implements MapFunction<Long, TreeMap<Long, Integer>> {
+
+		@Override
+		public TreeMap<Long, Integer> map(Long value) throws Exception {
+			TreeMap<Long, Integer> neighbors = new TreeMap<Long, Integer>();
+			neighbors.put(value, 1);
+
+			return neighbors;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class VertexInitializerEmptyTreeMap implements
+			MapFunction<Vertex<Long, TreeMap<Long, Integer>>, TreeMap<Long, Integer>> {
+
+		@Override
+		public TreeMap<Long, Integer> map(Vertex<Long, TreeMap<Long, Integer>> vertex) throws Exception {
+			return new TreeMap<Long, Integer>();
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class AttachValues implements MapFunction<Tuple2<TreeMap<Long, Integer>,
+			TreeMap<Long, Integer>>, TreeMap<Long, Integer>> {
+
+		@Override
+		public TreeMap<Long, Integer> map(Tuple2<TreeMap<Long, Integer>, TreeMap<Long, Integer>> tuple2) throws Exception {
+			return tuple2.f1;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class GatherHigherIdNeighbors implements ReduceNeighborsFunction<TreeMap<Long,Integer>> {
+
+		@Override
+		public TreeMap<Long,Integer> reduceNeighbors(TreeMap<Long,Integer> first, TreeMap<Long,Integer> second) {
+			for (Long key : second.keySet()) {
+				Integer value = first.get(key);
+				if (value != null) {
+					first.put(key, value + second.get(key));
+				} else {
+					first.put(key, second.get(key));
+				}
+			}
+			return first;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class ComputeTriangles implements MapFunction<Triplet<Long, TreeMap<Long, Integer>, NullValue>,
+			Tuple1<Integer>> {
+
+		@Override
+		public Tuple1<Integer> map(Triplet<Long, TreeMap<Long, Integer>, NullValue> triplet) throws Exception {
+
+			Vertex<Long, TreeMap<Long, Integer>> srcVertex = triplet.getSrcVertex();
+			Vertex<Long, TreeMap<Long, Integer>> trgVertex = triplet.getTrgVertex();
+			int triangles = 0;
+
+			if(trgVertex.getValue().get(srcVertex.getId()) != null) {
+				triangles=trgVertex.getValue().get(srcVertex.getId());
+			}
+			return new Tuple1<Integer>(triangles);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/97fb9a47/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
new file mode 100644
index 0000000..047bbf7
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.example.utils.TriangleCountData;
+import org.apache.flink.graph.library.GSATriangleCount;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class TriangleCountITCase extends MultipleProgramsTestBase {
+
+	private String expectedResult;
+
+	public TriangleCountITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Test
+	public void testGSATriangleCount() throws Exception {
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
+				env).getUndirected();
+
+		List<Tuple1<Integer>> numberOfTriangles = graph.run(new GSATriangleCount()).collect();
+		expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES;
+
+		compareResultAsTuples(numberOfTriangles, expectedResult);
+	}
+}