You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/03/26 23:38:46 UTC

[1/2] flink git commit: [FLINK-1726][gelly] Added Community Detection Library and Example

Repository: flink
Updated Branches:
  refs/heads/master 1f726e482 -> 9eef3c86c


[FLINK-1726][gelly] Added Community Detection Library and Example

This closes #505


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e3ba403
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e3ba403
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e3ba403

Branch: refs/heads/master
Commit: 4e3ba4039d694e539dcdbca74fd628140f85d5e9
Parents: 1f726e4
Author: andralungu <lu...@gmail.com>
Authored: Fri Mar 20 16:43:59 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Thu Mar 26 23:36:00 2015 +0100

----------------------------------------------------------------------
 docs/gelly_guide.md                             |   1 +
 .../SimpleCommunityDetectionExample.java        | 129 +++++++++++++
 .../SingleSourceShortestPathsExample.java       |  10 +-
 .../utils/SimpleCommunityDetectionData.java     |  65 +++++++
 .../graph/library/SimpleCommunityDetection.java | 187 +++++++++++++++++++
 .../example/SimpleCommunityDetectionITCase.java | 100 ++++++++++
 6 files changed, 484 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/docs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md
index 32c076b..0884405 100644
--- a/docs/gelly_guide.md
+++ b/docs/gelly_guide.md
@@ -402,6 +402,7 @@ Gelly has a growing collection of graph algorithms for easily analyzing large-sc
 * PageRank
 * Single-Source Shortest Paths
 * Label Propagation
+* Simple Community Detection
 
 Gelly's library methods can be used by simply calling the `run()` method on the input graph:
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
new file mode 100644
index 0000000..488603c
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.SimpleCommunityDetectionData;
+import org.apache.flink.graph.library.SimpleCommunityDetection;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+
+/**
+ * This example shows how to use the {@link org.apache.flink.graph.library.SimpleCommunityDetection}
+ * library method:
+ * <ul>
+ * 	<li> with the edge data set given as a parameter
+ * 	<li> with default data
+ * </ul>
+ */
+public class SimpleCommunityDetectionExample implements ProgramDescription {
+
+	public static void main(String [] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		// set up the execution environment
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		// set up the graph
+		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+		Graph<Long, Long, Double> graph = Graph.fromDataSet(edges,
+				new MapFunction<Long, Long>() {
+					@Override
+					public Long map(Long label) throws Exception {
+						return label;
+					}
+				}, env);
+
+		// the result is in the form of <vertexId, communityId>, where the communityId is the label
+		// which the vertex converged to
+		DataSet<Vertex<Long, Long>> communityVertices =
+				graph.run(new SimpleCommunityDetection(maxIterations, delta)).getVertices();
+
+		// emit result
+		if (fileOutput) {
+			communityVertices.writeAsCsv(outputPath, "\n", ",");
+		} else {
+			communityVertices.print();
+		}
+
+		env.execute("Executing Simple Community Detection Example");
+	}
+
+	@Override
+	public String getDescription() {
+		return "Simple Community Detection Example";
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
+	private static Integer maxIterations = SimpleCommunityDetectionData.MAX_ITERATIONS;
+	private static Double delta = SimpleCommunityDetectionData.DELTA;
+
+	private static boolean parseParameters(String [] args) {
+		if(args.length > 0) {
+			if(args.length != 4) {
+				System.err.println("Usage SimpleCommunityDetection <edge path> <output path> " +
+						"<num iterations> <delta>");
+				return false;
+			}
+
+			fileOutput = true;
+			edgeInputPath = args[0];
+			outputPath = args[1];
+			maxIterations = Integer.parseInt(args[2]);
+			delta = Double.parseDouble(args[3]);
+
+		} else {
+			System.out.println("Executing SimpleCommunityDetection example with default parameters and built-in default data.");
+			System.out.println("Provide parameters to read input data from files.");
+			System.out.println("Usage SimpleCommunityDetection <edge path> <output path> " +
+					"<num iterations> <delta>");
+		}
+
+		return true;
+	}
+
+	@SuppressWarnings("serial")
+	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+
+		if(fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.ignoreComments("#")
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class, Double.class)
+					.map(new Tuple3ToEdgeMap<Long, Double>());
+		} else {
+			return SimpleCommunityDetectionData.getDefaultEdgeDataSet(env);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
index ff523ce..22883a8 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
@@ -22,12 +22,12 @@ import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
 import org.apache.flink.graph.library.SingleSourceShortestPaths;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 
 /**
  * This example implements the Single Source Shortest Paths algorithm,
@@ -126,13 +126,7 @@ public class SingleSourceShortestPathsExample implements ProgramDescription {
 					.lineDelimiter("\n")
 					.fieldDelimiter("\t")
 					.types(Long.class, Long.class, Double.class)
-					.map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
-
-						@Override
-						public Edge<Long, Double> map(Tuple3<Long, Long, Double> tuple3) throws Exception {
-							return new Edge<Long, Double>(tuple3.f0, tuple3.f1, tuple3.f2);
-						}
-					});
+					.map(new Tuple3ToEdgeMap<Long, Double>());
 		} else {
 			return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java
new file mode 100644
index 0000000..20b562b
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/SimpleCommunityDetectionData.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data set used for the Simple Community Detection example program.
+ * If no parameters are given to the program, the default edge data set is used.
+ */
+public class SimpleCommunityDetectionData {
+
+	// the algorithm is not guaranteed to always converge
+	public static final Integer MAX_ITERATIONS = 30;
+
+	public static final double DELTA = 0.5f;
+
+	public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(1L, 3L, 2.0));
+		edges.add(new Edge<Long, Double>(1L, 4L, 3.0));
+		edges.add(new Edge<Long, Double>(2L, 3L, 4.0));
+		edges.add(new Edge<Long, Double>(2L, 4L, 5.0));
+		edges.add(new Edge<Long, Double>(3L, 5L, 6.0));
+		edges.add(new Edge<Long, Double>(5L, 6L, 7.0));
+		edges.add(new Edge<Long, Double>(5L, 7L, 8.0));
+		edges.add(new Edge<Long, Double>(6L, 7L, 9.0));
+		edges.add(new Edge<Long, Double>(7L, 12L, 10.0));
+		edges.add(new Edge<Long, Double>(8L, 9L, 11.0));
+		edges.add(new Edge<Long, Double>(8L, 10L, 12.0));
+		edges.add(new Edge<Long, Double>(8L, 11L, 13.0));
+		edges.add(new Edge<Long, Double>(9L, 10L, 14.0));
+		edges.add(new Edge<Long, Double>(9L, 11L, 15.0));
+		edges.add(new Edge<Long, Double>(10L, 11L, 16.0));
+		edges.add(new Edge<Long, Double>(10L, 12L, 17.0));
+		edges.add(new Edge<Long, Double>(11L, 12L, 18.0));
+
+		return env.fromCollection(edges);
+	}
+
+	private SimpleCommunityDetectionData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
new file mode 100644
index 0000000..5d3afc7
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexCentricIteration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Simple Community Detection Algorithm.
+ *
+ * Initially, each vertex is assigned a tuple formed of its own id along with a score equal to 1.0, as value.
+ * The vertices propagate their labels and max scores in iterations, each time adopting the label with the
+ * highest score from the list of received messages. The chosen label is afterwards re-scored using the fraction
+ * delta/the superstep number. Delta is passed as a parameter and has 0.5 as a default value.
+ *
+ * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
+ * is reached.
+ *
+ * @see <a href="http://arxiv.org/pdf/0808.2633.pdf">article explaining the algorithm in detail</a>
+ *
+ *<p>
+ * 	The input files is a plain text file and must be formatted as follows:
+ * 	<br>
+ * 	Edges are represented by tuples of srcVertexId, trgVertexId which are
+ * 	separated by tabs. Edges themselves are separated by newlines.
+ * 	For example: <code>1	2\n1	3\n</code> defines two edges 1-2 and 1-3
+ * </p>
+ *
+ * Usage <code>SimpleCommunityDetection &lt;edge path&gt; &lt;result path&gt;
+ * &lt;number of iterations&gt; &lt;delta&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.SimpleCommunityDetectionData}
+ */
+public class SimpleCommunityDetection implements GraphAlgorithm<Long, Long, Double> {
+
+	private Integer maxIterations;
+
+	private Double delta;
+
+	public SimpleCommunityDetection(Integer maxIterations, Double delta) {
+
+		this.maxIterations = maxIterations;
+		this.delta = delta;
+	}
+
+	@Override
+	public Graph<Long, Long, Double> run(Graph<Long, Long, Double> graph) {
+
+		Graph<Long, Long, Double> undirectedGraph = graph.getUndirected();
+
+		Graph<Long, Tuple2<Long, Double>, Double> graphWithScoredVertices = undirectedGraph
+				.mapVertices(new AddScoreToVertexValuesMapper());
+
+		VertexCentricIteration<Long, Tuple2<Long, Double>, Tuple2<Long, Double>, Double>
+				iteration = graphWithScoredVertices.createVertexCentricIteration(new VertexLabelUpdater(delta),
+				new LabelMessenger(), maxIterations);
+
+		return graphWithScoredVertices.runVertexCentricIteration(iteration)
+				.mapVertices(new RemoveScoreFromVertexValuesMapper());
+	}
+
+	public static final class VertexLabelUpdater extends VertexUpdateFunction<Long, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+
+		private Double delta;
+
+		public VertexLabelUpdater(Double delta) {
+			this.delta = delta;
+		}
+
+		@Override
+		public void updateVertex(Long vertexKey, Tuple2<Long, Double> labelScore,
+								MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception {
+
+			// we would like these two maps to be ordered
+			Map<Long, Double> receivedLabelsWithScores = new TreeMap<Long, Double>();
+			Map<Long, Double> labelsWithHighestScore = new TreeMap<Long, Double>();
+
+			for (Tuple2<Long, Double> message : inMessages) {
+				// split the message into received label and score
+				Long receivedLabel = message.f0;
+				Double receivedScore = message.f1;
+
+				// if the label was received before
+				if (receivedLabelsWithScores.containsKey(receivedLabel)) {
+					Double newScore = receivedScore + receivedLabelsWithScores.get(receivedLabel);
+					receivedLabelsWithScores.put(receivedLabel, newScore);
+				} else {
+					// first time we see the label
+					receivedLabelsWithScores.put(receivedLabel, receivedScore);
+				}
+
+				// store the labels with the highest scores
+				if (labelsWithHighestScore.containsKey(receivedLabel)) {
+					Double currentScore = labelsWithHighestScore.get(receivedLabel);
+					if (currentScore < receivedScore) {
+						// record the highest score
+						labelsWithHighestScore.put(receivedLabel, receivedScore);
+					}
+				} else {
+					// first time we see this label
+					labelsWithHighestScore.put(receivedLabel, receivedScore);
+				}
+			}
+
+			if(receivedLabelsWithScores.size() > 0) {
+				// find the label with the highest score from the ones received
+				Double maxScore = -Double.MAX_VALUE;
+				Long maxScoreLabel = labelScore.f0;
+				for (Long curLabel : receivedLabelsWithScores.keySet()) {
+
+					if (receivedLabelsWithScores.get(curLabel) > maxScore) {
+						maxScore = receivedLabelsWithScores.get(curLabel);
+						maxScoreLabel = curLabel;
+					}
+				}
+
+				// find the highest score of maxScoreLabel
+				Double highestScore = labelsWithHighestScore.get(maxScoreLabel);
+				// re-score the new label
+				if (maxScoreLabel != labelScore.f0) {
+					highestScore -= delta / getSuperstepNumber();
+				}
+				// else delta = 0
+				// update own label
+				setNewVertexValue(new Tuple2<Long, Double>(maxScoreLabel, highestScore));
+			}
+		}
+	}
+
+	public static final class LabelMessenger extends MessagingFunction<Long, Tuple2<Long, Double>,
+			Tuple2<Long, Double>, Double> {
+
+		@Override
+		public void sendMessages(Long vertexKey, Tuple2<Long, Double> vertexValue) throws Exception {
+
+			for(Edge<Long, Double> edge : getOutgoingEdges()) {
+				sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertexValue.f0, vertexValue.f1 * edge.getValue()));
+			}
+
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class AddScoreToVertexValuesMapper implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Double>> {
+
+		@Override
+		public Tuple2<Long, Double> map(Vertex<Long, Long> vertex) throws Exception {
+			return new Tuple2<Long, Double>(vertex.getValue(), 1.0);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class RemoveScoreFromVertexValuesMapper implements MapFunction<Vertex<Long, Tuple2<Long, Double>>, Long> {
+
+		@Override
+		public Long map(Vertex<Long, Tuple2<Long, Double>> vertex) throws Exception {
+			return vertex.getValue().f0;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e3ba403/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java
new file mode 100644
index 0000000..def5006
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/SimpleCommunityDetectionITCase.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.SimpleCommunityDetectionExample;
+import org.apache.flink.graph.example.utils.SimpleCommunityDetectionData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class SimpleCommunityDetectionITCase extends MultipleProgramsTestBase {
+
+	private String edgesPath;
+
+	private String resultPath;
+
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public SimpleCommunityDetectionITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+
+	@Test
+	public void testSingleIteration() throws Exception {
+		/*
+		 * Test one iteration of the Simple Community Detection Example
+		 */
+		final String edges = "1	2	1.0\n" + "1	3	2.0\n" + "1	4	3.0\n" + "1	5	4.0\n" + "2	6	5.0\n" +
+				"6	7	6.0\n" + "6	8	7.0\n" + "7	8	8.0";
+		edgesPath = createTempFile(edges);
+
+		SimpleCommunityDetectionExample.main(new String[]{edgesPath, resultPath, "1",
+				SimpleCommunityDetectionData.DELTA + ""});
+
+		expected = "1,5\n" + "2,6\n" + "3,1\n" + "4,1\n" + "5,1\n" + "6,8\n" + "7,8\n" + "8,7";
+	}
+
+	@Test
+	public void testTieBreaker() throws Exception {
+		/*
+		 * Test one iteration of the Simple Community Detection Example where a tie must be broken
+		 */
+
+		final String edges = "1	2	1.0\n" + "1	3	1.0\n" + "1	4	1.0\n" + "1	5	1.0";
+		edgesPath = createTempFile(edges);
+
+		SimpleCommunityDetectionExample.main(new String[] {edgesPath, resultPath, "1",
+				SimpleCommunityDetectionData.DELTA + ""});
+
+		expected = "1,2\n" + "2,1\n" + "3,1\n" + "4,1\n" + "5,1";
+	}
+
+
+	// -------------------------------------------------------------------------
+	// Util methods
+	// -------------------------------------------------------------------------
+	private String createTempFile(final String rows) throws Exception {
+		File tempFile = tempFolder.newFile();
+		Files.write(rows, tempFile, Charsets.UTF_8);
+		return tempFile.toURI().toString();
+	}
+}


[2/2] flink git commit: [gelly] corrected community detection usage description and moved it from the library method to the example; removed unnecessary @SuppressWarnings(serial) from SSSP

Posted by va...@apache.org.
[gelly] corrected community detection usage description and moved it from the library method to the example;
removed unnecessary @SuppressWarnings(serial) from SSSP


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9eef3c86
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9eef3c86
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9eef3c86

Branch: refs/heads/master
Commit: 9eef3c86c8b34732178d0bce441c1e6145c5ea64
Parents: 4e3ba40
Author: vasia <va...@gmail.com>
Authored: Thu Mar 26 22:53:55 2015 +0100
Committer: Vasia Kalavri <va...@apache.org>
Committed: Thu Mar 26 23:37:54 2015 +0100

----------------------------------------------------------------------
 .../example/SimpleCommunityDetectionExample.java     | 13 ++++++++++++-
 .../example/SingleSourceShortestPathsExample.java    |  1 -
 .../graph/library/SimpleCommunityDetection.java      | 15 ++-------------
 3 files changed, 14 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9eef3c86/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
index 488603c..fb7ed3f 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SimpleCommunityDetectionExample.java
@@ -36,9 +36,21 @@ import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
  * 	<li> with the edge data set given as a parameter
  * 	<li> with default data
  * </ul>
+ *
+ * The input files is a plain text file and must be formatted as follows:
+ * Edges are represented by tuples of srcVertexId, trgVertexId, weight which are
+ * separated by tabs. Edges themselves are separated by newlines.
+ * For example: <code>1\t2\t1.0\n1\t3\t2.0\n</code> defines two edges,
+ * 1-2 with weight 1.0 and 1-3 with weight 2.0.
+ *
+ * Usage <code>SimpleCommunityDetection &lt;edge path&gt; &lt;result path&gt;
+ * &lt;number of iterations&gt; &lt;delta&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.SimpleCommunityDetectionData}
  */
 public class SimpleCommunityDetectionExample implements ProgramDescription {
 
+	@SuppressWarnings("serial")
 	public static void main(String [] args) throws Exception {
 
 		if(!parseParameters(args)) {
@@ -112,7 +124,6 @@ public class SimpleCommunityDetectionExample implements ProgramDescription {
 		return true;
 	}
 
-	@SuppressWarnings("serial")
 	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
 
 		if(fileOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9eef3c86/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
index 22883a8..768f441 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/SingleSourceShortestPathsExample.java
@@ -119,7 +119,6 @@ public class SingleSourceShortestPathsExample implements ProgramDescription {
 		return true;
 	}
 
-	@SuppressWarnings("serial")
 	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
 		if (fileOutput) {
 			return env.readCsvFile(edgesInputPath)

http://git-wip-us.apache.org/repos/asf/flink/blob/9eef3c86/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
index 5d3afc7..fb32781 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
@@ -44,19 +44,6 @@ import java.util.TreeMap;
  * is reached.
  *
  * @see <a href="http://arxiv.org/pdf/0808.2633.pdf">article explaining the algorithm in detail</a>
- *
- *<p>
- * 	The input files is a plain text file and must be formatted as follows:
- * 	<br>
- * 	Edges are represented by tuples of srcVertexId, trgVertexId which are
- * 	separated by tabs. Edges themselves are separated by newlines.
- * 	For example: <code>1	2\n1	3\n</code> defines two edges 1-2 and 1-3
- * </p>
- *
- * Usage <code>SimpleCommunityDetection &lt;edge path&gt; &lt;result path&gt;
- * &lt;number of iterations&gt; &lt;delta&gt;</code><br>
- * If no parameters are provided, the program is run with default data from
- * {@link org.apache.flink.graph.example.utils.SimpleCommunityDetectionData}
  */
 public class SimpleCommunityDetection implements GraphAlgorithm<Long, Long, Double> {
 
@@ -86,6 +73,7 @@ public class SimpleCommunityDetection implements GraphAlgorithm<Long, Long, Doub
 				.mapVertices(new RemoveScoreFromVertexValuesMapper());
 	}
 
+	@SuppressWarnings("serial")
 	public static final class VertexLabelUpdater extends VertexUpdateFunction<Long, Tuple2<Long, Double>, Tuple2<Long, Double>> {
 
 		private Double delta;
@@ -154,6 +142,7 @@ public class SimpleCommunityDetection implements GraphAlgorithm<Long, Long, Doub
 		}
 	}
 
+	@SuppressWarnings("serial")
 	public static final class LabelMessenger extends MessagingFunction<Long, Tuple2<Long, Double>,
 			Tuple2<Long, Double>, Double> {