You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/03/26 23:38:46 UTC
[1/2] flink git commit: [FLINK-1726][gelly] Added Community Detection
Library and Example
Repository: flink
Updated Branches:
refs/heads/master 1f726e482 -> 9eef3c86c
[FLINK-1726][gelly] Added Community Detection Library and Example
This closes #505
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e3ba403
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e3ba403
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e3ba403
Branch: refs/heads/master
Commit: 4e3ba4039d694e539dcdbca74fd628140f85d5e9
Parents: 1f726e4
Author: andralungu <lu...@gmail.com>
Authored: Fri Mar 20 16:43:59 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Thu Mar 26 23:36:00 2015 +0100
----------------------------------------------------------------------
docs/gelly_guide.md | 1 +
.../SimpleCommunityDetectionExample.java | 129 +++++++++++++
.../SingleSourceShortestPathsExample.java | 10 +-
.../utils/SimpleCommunityDetectionData.java | 65 +++++++
.../graph/library/SimpleCommunityDetection.java | 187 +++++++++++++++++++
.../example/SimpleCommunityDetectionITCase.java | 100 ++++++++++
6 files changed, 484 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/docs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md
index 32c076b..0884405 100644
--- a/docs/gelly_guide.md
+++ b/docs/gelly_guide.md
@@ -402,6 +402,7 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc
* PageRank
* Single-Source Shortest Paths
* Label Propagation
+* Simple Community Detection
Gelly's library methods can be used by simply calling the `run()` method on the input graph:
http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
new file mode 100644
index 0000000..488603c
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.SimpleCommunityDetectionData;
+import org.apache.flink.graph.library.SimpleCommunityDetection;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This example shows how to use the {@link org.apache.flink.graph.library.SimpleCommunityDetection}
+ * library method:
+ * <ul>
+ * <li> with the edge data set given as a parameter
+ * <li> with default data
+ * </ul>
+ */
+public class SimpleCommunityDetectionExample implements ProgramDescription {
+
+ public static void main(String [] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ // set up the execution environment
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ // set up the graph
+ DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+ Graph<Long, Long, Double> graph = Graph.fromDataSet(edges,
+ new MapFunction<Long, Long>() {
+ @Override
+ public Long map(Long label) throws Exception {
+ return label;
+ }
+ }, env);
+
+ // the result is in the form of <vertexId, communityId>, where the communityId is the label
+ // which the vertex converged to
+ DataSet<Vertex<Long, Long>> communityVertices =
+ graph.run(new SimpleCommunityDetection(maxIterations, delta)).getVertices();
+
+ // emit result
+ if (fileOutput) {
+ communityVertices.writeAsCsv(outputPath, "\n", ",");
+ } else {
+ communityVertices.print();
+ }
+
+ env.execute("Executing Simple Community Detection Example");
+ }
+
+ @Override
+ public String getDescription() {
+ return "Simple Community Detection Example";
+ }
+
+ // *************************************************************************
+ // UTIL METHODS
+ // *************************************************************************
+
+ private static boolean fileOutput = false;
+ private static String edgeInputPath = null;
+ private static String outputPath = null;
+ private static Integer maxIterations = SimpleCommunityDetectionData.MAX_ITERATIONS;
+ private static Double delta = SimpleCommunityDetectionData.DELTA;
+
+ private static boolean parseParameters(String [] args) {
+ if(args.length > 0) {
+ if(args.length != 4) {
+ System.err.println("Usage SimpleCommunityDetection <edge path> <output path> " +
+ "<num iterations> <delta>");
+ return false;
+ }
+
+ fileOutput = true;
+ edgeInputPath = args[0];
+ outputPath = args[1];
+ maxIterations = Integer.parseInt(args[2]);
+ delta = Double.parseDouble(args[3]);
+
+ } else {
+ System.out.println("Executing SimpleCommunityDetection example with default parameters and built-in default data.");
+ System.out.println("Provide parameters to read input data from files.");
+ System.out.println("Usage SimpleCommunityDetection <edge path> <output path> " +
+ "<num iterations> <delta>");
+ }
+
+ return true;
+ }
+
+ @SuppressWarnings("serial")
+ private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+
+ if(fileOutput) {
+ return env.readCsvFile(edgeInputPath)
+ .ignoreComments("#")
+ .fieldDelimiter("\t")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class, Double.class)
+ .map(new Tuple3ToEdgeMap<Long, Double>());
+ } else {
+ return SimpleCommunityDetectionData.getDefaultEdgeDataSet(env);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
index ff523ce..22883a8 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
@@ -22,12 +22,12 @@ import org.apache.flink.api.common.ProgramDescription;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
import org.apache.flink.graph.library.SingleSourceShortestPaths;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
/**
* This example implements the Single Source Shortest Paths algorithm,
@@ -126,13 +126,7 @@ public class SingleSourceShortestPathsExample implements ProgramDescription {
.lineDelimiter("\n")
.fieldDelimiter("\t")
.types(Long.class, Long.class, Double.class)
- .map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
-
- @Override
- public Edge<Long, Double> map(Tuple3<Long, Long, Double> tuple3) throws Exception {
- return new Edge<Long, Double>(tuple3.f0, tuple3.f1, tuple3.f2);
- }
- });
+ .map(new Tuple3ToEdgeMap<Long, Double>());
} else {
return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java
new file mode 100644
index 0000000..20b562b
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data set used for the Simple Community Detection example program.
+ * If no parameters are given to the program, the default edge data set is used.
+ */
+public class SimpleCommunityDetectionData {
+
+ // the algorithm is not guaranteed to always converge
+ public static final Integer MAX_ITERATIONS = 30;
+
+ public static final double DELTA = 0.5f;
+
+ public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+ List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+ edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+ edges.add(new Edge<Long, Double>(1L, 3L, 2.0));
+ edges.add(new Edge<Long, Double>(1L, 4L, 3.0));
+ edges.add(new Edge<Long, Double>(2L, 3L, 4.0));
+ edges.add(new Edge<Long, Double>(2L, 4L, 5.0));
+ edges.add(new Edge<Long, Double>(3L, 5L, 6.0));
+ edges.add(new Edge<Long, Double>(5L, 6L, 7.0));
+ edges.add(new Edge<Long, Double>(5L, 7L, 8.0));
+ edges.add(new Edge<Long, Double>(6L, 7L, 9.0));
+ edges.add(new Edge<Long, Double>(7L, 12L, 10.0));
+ edges.add(new Edge<Long, Double>(8L, 9L, 11.0));
+ edges.add(new Edge<Long, Double>(8L, 10L, 12.0));
+ edges.add(new Edge<Long, Double>(8L, 11L, 13.0));
+ edges.add(new Edge<Long, Double>(9L, 10L, 14.0));
+ edges.add(new Edge<Long, Double>(9L, 11L, 15.0));
+ edges.add(new Edge<Long, Double>(10L, 11L, 16.0));
+ edges.add(new Edge<Long, Double>(10L, 12L, 17.0));
+ edges.add(new Edge<Long, Double>(11L, 12L, 18.0));
+
+ return env.fromCollection(edges);
+ }
+
+ private SimpleCommunityDetectionData() {}
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
new file mode 100644
index 0000000..5d3afc7
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexCentricIteration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Simple Community Detection Algorithm.
+ *
+ * Initially, each vertex is assigned a tuple formed of its own id along with a score equal to 1.0, as value.
+ * The vertices propagate their labels and max scores in iterations, each time adopting the label with the
+ * highest score from the list of received messages. The chosen label is afterwards re-scored using the fraction
+ * delta/the superstep number. Delta is passed as a parameter and has 0.5 as a default value.
+ *
+ * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
+ * is reached.
+ *
+ * @see <a href="http://arxiv.org/pdf/0808.2633.pdf">article explaining the algorithm in detail</a>
+ *
+ *<p>
+ * The input files is a plain text file and must be formatted as follows:
+ * <br>
+ * Edges are represented by tuples of srcVertexId, trgVertexId which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1 2\n1 3\n</code> defines two edges 1-2 and 1-3
+ * </p>
+ *
+ * Usage <code>SimpleCommunityDetection <edge path> <result path>
+ * <number of iterations> <delta></code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.SimpleCommunityDetectionData}
+ */
+public class SimpleCommunityDetection implements GraphAlgorithm<Long, Long, Double> {
+
+ private Integer maxIterations;
+
+ private Double delta;
+
+ public SimpleCommunityDetection(Integer maxIterations, Double delta) {
+
+ this.maxIterations = maxIterations;
+ this.delta = delta;
+ }
+
+ @Override
+ public Graph<Long, Long, Double> run(Graph<Long, Long, Double> graph) {
+
+ Graph<Long, Long, Double> undirectedGraph = graph.getUndirected();
+
+ Graph<Long, Tuple2<Long, Double>, Double> graphWithScoredVertices = undirectedGraph
+ .mapVertices(new AddScoreToVertexValuesMapper());
+
+ VertexCentricIteration<Long, Tuple2<Long, Double>, Tuple2<Long, Double>, Double>
+ iteration = graphWithScoredVertices.createVertexCentricIteration(new VertexLabelUpdater(delta),
+ new LabelMessenger(), maxIterations);
+
+ return graphWithScoredVertices.runVertexCentricIteration(iteration)
+ .mapVertices(new RemoveScoreFromVertexValuesMapper());
+ }
+
+ public static final class VertexLabelUpdater extends VertexUpdateFunction<Long, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+
+ private Double delta;
+
+ public VertexLabelUpdater(Double delta) {
+ this.delta = delta;
+ }
+
+ @Override
+ public void updateVertex(Long vertexKey, Tuple2<Long, Double> labelScore,
+ MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception {
+
+ // we would like these two maps to be ordered
+ Map<Long, Double> receivedLabelsWithScores = new TreeMap<Long, Double>();
+ Map<Long, Double> labelsWithHighestScore = new TreeMap<Long, Double>();
+
+ for (Tuple2<Long, Double> message : inMessages) {
+ // split the message into received label and score
+ Long receivedLabel = message.f0;
+ Double receivedScore = message.f1;
+
+ // if the label was received before
+ if (receivedLabelsWithScores.containsKey(receivedLabel)) {
+ Double newScore = receivedScore + receivedLabelsWithScores.get(receivedLabel);
+ receivedLabelsWithScores.put(receivedLabel, newScore);
+ } else {
+ // first time we see the label
+ receivedLabelsWithScores.put(receivedLabel, receivedScore);
+ }
+
+ // store the labels with the highest scores
+ if (labelsWithHighestScore.containsKey(receivedLabel)) {
+ Double currentScore = labelsWithHighestScore.get(receivedLabel);
+ if (currentScore < receivedScore) {
+ // record the highest score
+ labelsWithHighestScore.put(receivedLabel, receivedScore);
+ }
+ } else {
+ // first time we see this label
+ labelsWithHighestScore.put(receivedLabel, receivedScore);
+ }
+ }
+
+ if(receivedLabelsWithScores.size() > 0) {
+ // find the label with the highest score from the ones received
+ Double maxScore = -Double.MAX_VALUE;
+ Long maxScoreLabel = labelScore.f0;
+ for (Long curLabel : receivedLabelsWithScores.keySet()) {
+
+ if (receivedLabelsWithScores.get(curLabel) > maxScore) {
+ maxScore = receivedLabelsWithScores.get(curLabel);
+ maxScoreLabel = curLabel;
+ }
+ }
+
+ // find the highest score of maxScoreLabel
+ Double highestScore = labelsWithHighestScore.get(maxScoreLabel);
+ // re-score the new label
+ if (maxScoreLabel != labelScore.f0) {
+ highestScore -= delta / getSuperstepNumber();
+ }
+ // else delta = 0
+ // update own label
+ setNewVertexValue(new Tuple2<Long, Double>(maxScoreLabel, highestScore));
+ }
+ }
+ }
+
+ public static final class LabelMessenger extends MessagingFunction<Long, Tuple2<Long, Double>,
+ Tuple2<Long, Double>, Double> {
+
+ @Override
+ public void sendMessages(Long vertexKey, Tuple2<Long, Double> vertexValue) throws Exception {
+
+ for(Edge<Long, Double> edge : getOutgoingEdges()) {
+ sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertexValue.f0, vertexValue.f1 * edge.getValue()));
+ }
+
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class AddScoreToVertexValuesMapper implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Double>> {
+
+ @Override
+ public Tuple2<Long, Double> map(Vertex<Long, Long> vertex) throws Exception {
+ return new Tuple2<Long, Double>(vertex.getValue(), 1.0);
+ }
+ }
+
+ @SuppressWarnings("serial")
+ public static final class RemoveScoreFromVertexValuesMapper implements MapFunction<Vertex<Long, Tuple2<Long, Double>>, Long> {
+
+ @Override
+ public Long map(Vertex<Long, Tuple2<Long, Double>> vertex) throws Exception {
+ return vertex.getValue().f0;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java
new file mode 100644
index 0000000..def5006
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.SimpleCommunityDetectionExample;
+import org.apache.flink.graph.example.utils.SimpleCommunityDetectionData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class SimpleCommunityDetectionITCase extends MultipleProgramsTestBase {
+
+ private String edgesPath;
+
+ private String resultPath;
+
+ private String expected;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ public SimpleCommunityDetectionITCase(TestExecutionMode mode) {
+ super(mode);
+ }
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ }
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expected, resultPath);
+ }
+
+ @Test
+ public void testSingleIteration() throws Exception {
+ /*
+ * Test one iteration of the Simple Community Detection Example
+ */
+ final String edges = "1 2 1.0\n" + "1 3 2.0\n" + "1 4 3.0\n" + "1 5 4.0\n" + "2 6 5.0\n" +
+ "6 7 6.0\n" + "6 8 7.0\n" + "7 8 8.0";
+ edgesPath = createTempFile(edges);
+
+ SimpleCommunityDetectionExample.main(new String[]{edgesPath, resultPath, "1",
+ SimpleCommunityDetectionData.DELTA + ""});
+
+ expected = "1,5\n" + "2,6\n" + "3,1\n" + "4,1\n" + "5,1\n" + "6,8\n" + "7,8\n" + "8,7";
+ }
+
+ @Test
+ public void testTieBreaker() throws Exception {
+ /*
+ * Test one iteration of the Simple Community Detection Example where a tie must be broken
+ */
+
+ final String edges = "1 2 1.0\n" + "1 3 1.0\n" + "1 4 1.0\n" + "1 5 1.0";
+ edgesPath = createTempFile(edges);
+
+ SimpleCommunityDetectionExample.main(new String[] {edgesPath, resultPath, "1",
+ SimpleCommunityDetectionData.DELTA + ""});
+
+ expected = "1,2\n" + "2,1\n" + "3,1\n" + "4,1\n" + "5,1";
+ }
+
+
+ // -------------------------------------------------------------------------
+ // Util methods
+ // -------------------------------------------------------------------------
+ private String createTempFile(final String rows) throws Exception {
+ File tempFile = tempFolder.newFile();
+ Files.write(rows, tempFile, Charsets.UTF_8);
+ return tempFile.toURI().toString();
+ }
+}
[2/2] flink git commit: [gelly] corrected community detection usage
description and moved it from the library method to the example;
removed unnecessary @SuppressWarnings(serial) from SSSP
Posted by va...@apache.org.
[gelly] corrected community detection usage description and moved it from the library method to the example;
removed unnecessary @SuppressWarnings(serial) from SSSP
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9eef3c86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9eef3c86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9eef3c86
Branch: refs/heads/master
Commit: 9eef3c86c8b34732178d0bce441c1e6145c5ea64
Parents: 4e3ba40
Author: vasia <va...@gmail.com>
Authored: Thu Mar 26 22:53:55 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Thu Mar 26 23:37:54 2015 +0100
----------------------------------------------------------------------
.../example/SimpleCommunityDetectionExample.java | 13 ++++++++++++-
.../example/SingleSourceShortestPathsExample.java | 1 -
.../graph/library/SimpleCommunityDetection.java | 15 ++-------------
3 files changed, 14 insertions(+), 15 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9eef3c86/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
index 488603c..fb7ed3f 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
@@ -36,9 +36,21 @@ import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
* <li> with the edge data set given as a parameter
* <li> with default data
* </ul>
+ *
+ * The input files is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, weight which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t1.0\n1\t3\t2.0\n</code> defines two edges,
+ * 1-2 with weight 1.0 and 1-3 with weight 2.0.
+ *
+ * Usage <code>SimpleCommunityDetection <edge path> <result path>
+ * <number of iterations> <delta></code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.SimpleCommunityDetectionData}
*/
public class SimpleCommunityDetectionExample implements ProgramDescription {
+ @SuppressWarnings("serial")
public static void main(String [] args) throws Exception {
if(!parseParameters(args)) {
@@ -112,7 +124,6 @@ public class SimpleCommunityDetectionExample implements ProgramDescription {
return true;
}
- @SuppressWarnings("serial")
private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
if(fileOutput) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9eef3c86/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
index 22883a8..768f441 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
@@ -119,7 +119,6 @@ public class SingleSourceShortestPathsExample implements ProgramDescription {
return true;
}
- @SuppressWarnings("serial")
private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
if (fileOutput) {
return env.readCsvFile(edgesInputPath)
http://git-wip-us.apache.org/repos/asf/flink/blob/9eef3c86/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
index 5d3afc7..fb32781 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
@@ -44,19 +44,6 @@ import java.util.TreeMap;
* is reached.
*
* @see <a href="http://arxiv.org/pdf/0808.2633.pdf">article explaining the algorithm in detail</a>
- *
- *<p>
- * The input files is a plain text file and must be formatted as follows:
- * <br>
- * Edges are represented by tuples of srcVertexId, trgVertexId which are
- * separated by tabs. Edges themselves are separated by newlines.
- * For example: <code>1 2\n1 3\n</code> defines two edges 1-2 and 1-3
- * </p>
- *
- * Usage <code>SimpleCommunityDetection <edge path> <result path>
- * <number of iterations> <delta></code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.SimpleCommunityDetectionData}
*/
public class SimpleCommunityDetection implements GraphAlgorithm<Long, Long, Double> {
@@ -86,6 +73,7 @@ public class SimpleCommunityDetection implements GraphAlgorithm<Long, Long, Doub
.mapVertices(new RemoveScoreFromVertexValuesMapper());
}
+ @SuppressWarnings("serial")
public static final class VertexLabelUpdater extends VertexUpdateFunction<Long, Tuple2<Long, Double>, Tuple2<Long, Double>> {
private Double delta;
@@ -154,6 +142,7 @@ public class SimpleCommunityDetection implements GraphAlgorithm<Long, Long, Doub
}
}
+ @SuppressWarnings("serial")
public static final class LabelMessenger extends MessagingFunction<Long, Tuple2<Long, Double>,
Tuple2<Long, Double>, Double> {