You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by an...@apache.org on 2015/09/08 11:52:02 UTC
flink git commit: [FLINK-2570] [gelly] Added a Triangle Count Library
Method
Repository: flink
Updated Branches:
refs/heads/master 948b6e057 -> 97fb9a47c
[FLINK-2570] [gelly] Added a Triangle Count Library Method
[FLINK-2570] [gelly] Added a description of the I/O
This closes #1054
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/97fb9a47
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/97fb9a47
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/97fb9a47
Branch: refs/heads/master
Commit: 97fb9a47c6e7efa33b84b10fcc5454189002f6f3
Parents: 948b6e0
Author: Andra Lungu <lu...@gmail.com>
Authored: Tue Aug 25 14:07:18 2015 +0200
Committer: Andra Lungu <lu...@gmail.com>
Committed: Tue Sep 8 11:50:30 2015 +0200
----------------------------------------------------------------------
docs/libs/gelly_guide.md | 4 +
.../example/utils/CommunityDetectionData.java | 2 +-
.../example/utils/LabelPropagationData.java | 4 +
.../graph/example/utils/MusicProfilesData.java | 4 +
.../flink/graph/example/utils/PageRankData.java | 4 +
.../graph/example/utils/TriangleCountData.java | 56 ++++++
.../flink/graph/library/GSATriangleCount.java | 189 +++++++++++++++++++
.../graph/test/library/TriangleCountITCase.java | 56 ++++++
8 files changed, 318 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/97fb9a47/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index e0e05b3..562df31 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -800,6 +800,10 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc
* Label Propagation
* Simple Community Detection
* Connected Components
+* GSA PageRank
+* GSA Connected Components
+* GSA Single-Source Shortest Paths
+* GSA Triangle Count
Gelly's library methods can be used by simply calling the `run()` method on the input graph:
http://git-wip-us.apache.org/repos/asf/flink/blob/97fb9a47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
index 196de3a..c37b2b5 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/CommunityDetectionData.java
@@ -26,7 +26,7 @@ import java.util.ArrayList;
import java.util.List;
/**
- * Provides the default data set used for the Simple Community Detection example program.
+ * Provides the default data set used for the Simple Community Detection test program.
* If no parameters are given to the program, the default edge data set is used.
*/
public class CommunityDetectionData {
http://git-wip-us.apache.org/repos/asf/flink/blob/97fb9a47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
index b70a9c4..0a92097 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/LabelPropagationData.java
@@ -27,6 +27,10 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Vertex;
import org.apache.flink.types.NullValue;
+/**
+ * Provides the default data set used for the Label Propagation test program.
+ * If no parameters are given to the program, the default edge data set is used.
+ */
public class LabelPropagationData {
public static final String LABELS_AFTER_1_ITERATION = "1,10\n" +
http://git-wip-us.apache.org/repos/asf/flink/blob/97fb9a47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
index 6b96372..3a97d1f 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/MusicProfilesData.java
@@ -25,6 +25,10 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple3;
+/**
+ * Provides the default data sets used for the Music Profiles example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
public class MusicProfilesData {
public static DataSet<Tuple3<String, String, Integer>> getUserSongTriplets(ExecutionEnvironment env) {
http://git-wip-us.apache.org/repos/asf/flink/blob/97fb9a47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
index c84808a..58d4f8b 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/PageRankData.java
@@ -25,6 +25,10 @@ import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Edge;
+/**
+ * Provides the default data set used for the PageRank test program.
+ * If no parameters are given to the program, the default edge data set is used.
+ */
public class PageRankData {
public static final String EDGES = "2 1\n" +
http://git-wip-us.apache.org/repos/asf/flink/blob/97fb9a47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
new file mode 100644
index 0000000..5b2cc3d
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/TriangleCountData.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.types.NullValue;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the Triangle Count test program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class TriangleCountData {
+
+ public static final String EDGES = "1 2\n"+"1 3\n"+"2 3\n"+"2 6\n"+"3 4\n"+"3 5\n"+"3 6\n"+"4 5\n"+"6 7\n";
+
+ public static DataSet<Edge<Long, NullValue>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+ List<Edge<Long, NullValue>> edges = new ArrayList<Edge<Long, NullValue>>();
+ edges.add(new Edge<Long, NullValue>(1L, 2L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(1L, 3L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(2L, 3L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(2L, 6L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(3L, 4L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(3L, 5L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(3L, 6L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(4L, 5L, NullValue.getInstance()));
+ edges.add(new Edge<Long, NullValue>(6L, 7L, NullValue.getInstance()));
+
+ return env.fromCollection(edges);
+ }
+
+ public static final String RESULTED_NUMBER_OF_TRIANGLES = "3";
+
+ private TriangleCountData () {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/97fb9a47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
new file mode 100644
index 0000000..95c70bf
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.ReduceNeighborsFunction;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.types.NullValue;
+
+import java.util.TreeMap;
+
+/**
+ * Triangle Count Algorithm.
+ *
+ * This algorithm operates in three phases. First, vertices select neighbors with id greater than theirs
+ * and send messages to them. Each received message is then propagated to neighbors with higher id.
+ * Finally, if a node encounters the target id in the list of received messages, it increments the number
+ * of triangles found.
+ *
+ * This implementation is non - iterative.
+ *
+ * The algorithm takes an undirected, unweighted graph as input and outputs a DataSet of
+ * Tuple1 which contains a single integer representing the number of triangles.
+ */
+public class GSATriangleCount implements
+ GraphAlgorithm<Long, NullValue, NullValue, DataSet<Tuple1<Integer>>> {
+
+ @Override
+ public DataSet<Tuple1<Integer>> run(Graph<Long, NullValue, NullValue> input) throws Exception {
+
+ ExecutionEnvironment env = input.getContext();
+
+ // order the edges so that src is always higher than trg
+ DataSet<Edge<Long, NullValue>> edges = input.getEdges()
+ .map(new OrderEdges()).distinct();
+
+ Graph<Long, TreeMap<Long, Integer>, NullValue> graph = Graph.fromDataSet(edges,
+ new VertexInitializer(), env);
+
+ // select neighbors with ids higher than the current vertex id
+ // Gather: a no-op in this case
+ // Sum: create the set of neighbors
+ DataSet<Tuple2<Long, TreeMap<Long, Integer>>> higherIdNeighbors =
+ graph.reduceOnNeighbors(new GatherHigherIdNeighbors(), EdgeDirection.IN);
+
+ Graph<Long, TreeMap<Long, Integer>, NullValue> graphWithReinitializedVertexValues =
+ graph.mapVertices(new VertexInitializerEmptyTreeMap());
+
+ // Apply: attach the computed values to the vertices
+ // joinWithVertices to update the node values
+ DataSet<Vertex<Long, TreeMap<Long, Integer>>> verticesWithHigherIdNeighbors =
+ graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new AttachValues()).getVertices();
+
+ Graph<Long, TreeMap<Long,Integer>, NullValue> graphWithNeighbors = Graph.fromDataSet(verticesWithHigherIdNeighbors,
+ edges, env);
+
+ // propagate each received value to neighbors with higher id
+ // Gather: a no-op in this case
+ // Sum: propagate values
+ DataSet<Tuple2<Long, TreeMap<Long, Integer>>> propagatedValues = graphWithNeighbors
+ .reduceOnNeighbors(new GatherHigherIdNeighbors(), EdgeDirection.IN);
+
+ // Apply: attach propagated values to vertices
+ DataSet<Vertex<Long, TreeMap<Long, Integer>>> verticesWithPropagatedValues =
+ graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new AttachValues()).getVertices();
+
+ Graph<Long, TreeMap<Long, Integer>, NullValue> graphWithPropagatedNeighbors =
+ Graph.fromDataSet(verticesWithPropagatedValues, graphWithNeighbors.getEdges(), env);
+
+ // Scatter: compute the number of triangles
+ DataSet<Tuple1<Integer>> numberOfTriangles = graphWithPropagatedNeighbors.getTriplets()
+ .map(new ComputeTriangles()).reduce(new ReduceFunction<Tuple1<Integer>>() {
+
+ @Override
+ public Tuple1<Integer> reduce(Tuple1<Integer> firstTuple, Tuple1<Integer> secondTuple) throws Exception {
+ return new Tuple1<Integer>(firstTuple.f0 + secondTuple.f0);
+ }
+ });
+
+ return numberOfTriangles;
+ }
+
+ @SuppressWarnings("serial")
+ private static final class OrderEdges implements MapFunction<Edge<Long, NullValue>, Edge<Long, NullValue>> {
+
+ @Override
+ public Edge<Long, NullValue> map(Edge<Long, NullValue> edge) throws Exception {
+ if (edge.getSource() < edge.getTarget()) {
+ return new Edge<Long, NullValue>(edge.getTarget(), edge.getSource(), NullValue.getInstance());
+ } else {
+ return edge;
+ }
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class VertexInitializer implements MapFunction<Long, TreeMap<Long, Integer>> {
+
+ @Override
+ public TreeMap<Long, Integer> map(Long value) throws Exception {
+ TreeMap<Long, Integer> neighbors = new TreeMap<Long, Integer>();
+ neighbors.put(value, 1);
+
+ return neighbors;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class VertexInitializerEmptyTreeMap implements
+ MapFunction<Vertex<Long, TreeMap<Long, Integer>>, TreeMap<Long, Integer>> {
+
+ @Override
+ public TreeMap<Long, Integer> map(Vertex<Long, TreeMap<Long, Integer>> vertex) throws Exception {
+ return new TreeMap<Long, Integer>();
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class AttachValues implements MapFunction<Tuple2<TreeMap<Long, Integer>,
+ TreeMap<Long, Integer>>, TreeMap<Long, Integer>> {
+
+ @Override
+ public TreeMap<Long, Integer> map(Tuple2<TreeMap<Long, Integer>, TreeMap<Long, Integer>> tuple2) throws Exception {
+ return tuple2.f1;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class GatherHigherIdNeighbors implements ReduceNeighborsFunction<TreeMap<Long,Integer>> {
+
+ @Override
+ public TreeMap<Long,Integer> reduceNeighbors(TreeMap<Long,Integer> first, TreeMap<Long,Integer> second) {
+ for (Long key : second.keySet()) {
+ Integer value = first.get(key);
+ if (value != null) {
+ first.put(key, value + second.get(key));
+ } else {
+ first.put(key, second.get(key));
+ }
+ }
+ return first;
+ }
+ }
+
+ @SuppressWarnings("serial")
+ private static final class ComputeTriangles implements MapFunction<Triplet<Long, TreeMap<Long, Integer>, NullValue>,
+ Tuple1<Integer>> {
+
+ @Override
+ public Tuple1<Integer> map(Triplet<Long, TreeMap<Long, Integer>, NullValue> triplet) throws Exception {
+
+ Vertex<Long, TreeMap<Long, Integer>> srcVertex = triplet.getSrcVertex();
+ Vertex<Long, TreeMap<Long, Integer>> trgVertex = triplet.getTrgVertex();
+ int triangles = 0;
+
+ if(trgVertex.getValue().get(srcVertex.getId()) != null) {
+ triangles=trgVertex.getValue().get(srcVertex.getId());
+ }
+ return new Tuple1<Integer>(triangles);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/97fb9a47/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
new file mode 100644
index 0000000..047bbf7
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.library;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.example.utils.TriangleCountData;
+import org.apache.flink.graph.library.GSATriangleCount;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.List;
+
+@RunWith(Parameterized.class)
+public class TriangleCountITCase extends MultipleProgramsTestBase {
+
+ private String expectedResult;
+
+ public TriangleCountITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Test
+ public void testGSATriangleCount() throws Exception {
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
+ env).getUndirected();
+
+ List<Tuple1<Integer>> numberOfTriangles = graph.run(new GSATriangleCount()).collect();
+ expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES;
+
+ compareResultAsTuples(numberOfTriangles, expectedResult);
+ }
+}