You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/04/26 14:01:05 UTC

[2/9] flink git commit: [FLINK-1514] [gelly] Removed GGC example, added Connected Components instead

[FLINK-1514] [gelly] Removed GGC example, added Connected Components instead


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1f56e9d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1f56e9d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1f56e9d

Branch: refs/heads/master
Commit: e1f56e9d767347c5c69f943921cc0cac29708036
Parents: 722719f
Author: Dániel Bali <ba...@gmail.com>
Authored: Fri Mar 20 15:35:53 2015 +0100
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 19:52:23 2015 +0200

----------------------------------------------------------------------
 .../example/GSAConnectedComponentsExample.java  | 208 +++++++++++++++++
 .../example/GSAGreedyGraphColoringExample.java  | 224 -------------------
 .../org/apache/flink/graph/gsa/RichEdge.java    |   3 +-
 .../flink/graph/test/GatherSumApplyITCase.java  |  31 +--
 4 files changed, 227 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1f56e9d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
new file mode 100755
index 0000000..88938b5
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.GatherSumApplyIteration;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.RichEdge;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * This is an implementation of the connected components algorithm, using a gather-sum-apply iteration
+ */
+public class GSAConnectedComponentsExample implements ProgramDescription {
+
+	// --------------------------------------------------------------------------------------------
+	//  Program
+	// --------------------------------------------------------------------------------------------
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
+		DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
+
+		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
+
+		// Simply return the vertex value of each vertex
+		GatherFunction<Long, NullValue, Long> gather = new ConnectedComponentsGather();
+
+		// Select the lower value among neighbors
+		SumFunction<Long, NullValue, Long> sum = new ConnectedComponentsSum();
+
+		// Set the lower value for each vertex
+		ApplyFunction<Long, NullValue, Long> apply = new ConnectedComponentsApply();
+
+		// Execute the GSA iteration
+		GatherSumApplyIteration<Long, Long, NullValue, Long> iteration =
+				graph.createGatherSumApplyIteration(gather, sum, apply, maxIterations);
+		Graph<Long, Long, NullValue> result = graph.runGatherSumApplyIteration(iteration);
+
+		// Extract the vertices as the result
+		DataSet<Vertex<Long, Long>> greedyGraphColoring = result.getVertices();
+
+		// emit result
+		if (fileOutput) {
+			greedyGraphColoring.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			greedyGraphColoring.print();
+		}
+
+		env.execute("GSA Connected Components");
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Connected Components UDFs
+	// --------------------------------------------------------------------------------------------
+
+	private static final class ConnectedComponentsGather
+			extends GatherFunction<Long, NullValue, Long> {
+		@Override
+		public Long gather(RichEdge<Long, NullValue> richEdge) {
+
+			return richEdge.getSrcVertexValue();
+		}
+	};
+
+	private static final class ConnectedComponentsSum
+			extends SumFunction<Long, NullValue, Long> {
+		@Override
+		public Long sum(Long newValue, Long currentValue) {
+
+			return Math.min(newValue, currentValue);
+		}
+	};
+
+	private static final class ConnectedComponentsApply
+			extends ApplyFunction<Long, NullValue, Long> {
+		@Override
+		public void apply(Long summedValue, Long origValue) {
+
+			if (summedValue < origValue) {
+				setResult(summedValue);
+			}
+		}
+	};
+
+	// --------------------------------------------------------------------------------------------
+	//  Util methods
+	// --------------------------------------------------------------------------------------------
+
+	private static boolean fileOutput = false;
+	private static String vertexInputPath = null;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
+
+	private static int maxIterations = 16;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+
+			if(args.length != 4) {
+				System.err.println("Usage: GSAConnectedComponentsExample <vertex path> <edge path> " +
+						"<result path> <max iterations>");
+				return false;
+			}
+
+			vertexInputPath = args[0];
+			edgeInputPath = args[1];
+			outputPath = args[2];
+			maxIterations = Integer.parseInt(args[3]);
+		} else {
+			System.out.println("Executing GSA Connected Components example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: GSAConnectedComponentsExample <vertex path> <edge path> "
+					+ "<result path> <max iterations>");
+		}
+		return true;
+	}
+
+	private static DataSet<Vertex<Long, Long>> getVertexDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return env
+					.readCsvFile(vertexInputPath)
+					.fieldDelimiter(" ")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Vertex<Long, Long>>() {
+						@Override
+						public Vertex<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
+							return new Vertex<Long, Long>(value.f0, value.f1);
+						}
+					});
+		}
+
+		return env.generateSequence(0, 5).map(new MapFunction<Long, Vertex<Long, Long>>() {
+			@Override
+			public Vertex<Long, Long> map(Long value) throws Exception {
+				return new Vertex<Long, Long>(value, value);
+			}
+		});
+	}
+
+	private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.fieldDelimiter(" ")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+						@Override
+						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
+							return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
+						}
+					});
+		}
+
+		// Generates 3 components of size 2
+		return env.generateSequence(0, 2).flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() {
+			@Override
+			public void flatMap(Long value, Collector<Edge<Long, NullValue>> out) throws Exception {
+				out.collect(new Edge<Long, NullValue>(value, value + 3, NullValue.getInstance()));
+			}
+		});
+	}
+
+	@Override
+	public String getDescription() {
+		return "GSA Greedy Graph Coloring";
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f56e9d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java
deleted file mode 100755
index 4acd86c..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.GatherSumApplyIteration;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.RichEdge;
-import org.apache.flink.util.Collector;
-
-import java.util.HashSet;
-
-/**
- * This is an implementation of the Greedy Graph Coloring algorithm, using a gather-sum-apply iteration
- */
-public class GSAGreedyGraphColoringExample implements ProgramDescription {
-
-	// --------------------------------------------------------------------------------------------
-	//  Program
-	// --------------------------------------------------------------------------------------------
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
-		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
-
-		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
-
-		// Gather the target vertices into a one-element set
-		GatherFunction<Double, Double, HashSet<Double>> gather = new GreedyGraphColoringGather();
-
-		// Merge the sets between neighbors
-		SumFunction<Double, Double, HashSet<Double>> sum = new GreedyGraphColoringSum();
-
-		// Find the minimum vertex id in the set which will be propagated
-		ApplyFunction<Double, Double, HashSet<Double>> apply = new GreedyGraphColoringApply();
-
-		// Execute the GSA iteration
-		GatherSumApplyIteration<Long, Double, Double, HashSet<Double>> iteration =
-				graph.createGatherSumApplyIteration(gather, sum, apply, maxIterations);
-		Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration(iteration);
-
-		// Extract the vertices as the result
-		DataSet<Vertex<Long, Double>> greedyGraphColoring = result.getVertices();
-
-		// emit result
-		if (fileOutput) {
-			greedyGraphColoring.writeAsCsv(outputPath, "\n", " ");
-		} else {
-			greedyGraphColoring.print();
-		}
-
-		env.execute("GSA Greedy Graph Coloring");
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Greedy Graph Coloring UDFs
-	// --------------------------------------------------------------------------------------------
-
-	private static final class GreedyGraphColoringGather
-			extends GatherFunction<Double, Double, HashSet<Double>> {
-		@Override
-		public HashSet<Double> gather(RichEdge<Double, Double> richEdge) {
-
-			HashSet<Double> result = new HashSet<Double>();
-			result.add(richEdge.getSrcVertexValue());
-
-			return result;
-		}
-	};
-
-	private static final class GreedyGraphColoringSum
-			extends SumFunction<Double, Double, HashSet<Double>> {
-		@Override
-		public HashSet<Double> sum(HashSet<Double> newValue, HashSet<Double> currentValue) {
-
-			HashSet<Double> result = new HashSet<Double>();
-			result.addAll(newValue);
-			result.addAll(currentValue);
-
-			return result;
-		}
-	};
-
-	private static final class GreedyGraphColoringApply
-			extends ApplyFunction<Double, Double, HashSet<Double>> {
-		@Override
-		public void apply(HashSet<Double> set, Double src) {
-			double minValue = src;
-			for (Double d : set) {
-				if (d < minValue) {
-					minValue = d;
-				}
-			}
-
-			// This is the condition that enables the termination of the iteration
-			if (minValue < src) {
-				setResult(minValue);
-			}
-		}
-	};
-
-	// --------------------------------------------------------------------------------------------
-	//  Util methods
-	// --------------------------------------------------------------------------------------------
-
-	private static boolean fileOutput = false;
-	private static String vertexInputPath = null;
-	private static String edgeInputPath = null;
-	private static String outputPath = null;
-
-	private static int maxIterations = 16;
-
-	private static boolean parseParameters(String[] args) {
-
-		if(args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-
-			if(args.length != 4) {
-				System.err.println("Usage: GSAGreedyGraphColoringExample <vertex path> <edge path> " +
-						"<result path> <max iterations>");
-				return false;
-			}
-
-			vertexInputPath = args[0];
-			edgeInputPath = args[1];
-			outputPath = args[2];
-			maxIterations = Integer.parseInt(args[3]);
-		} else {
-			System.out.println("Executing GSA Greedy Graph Coloring example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: GSAGreedyGraphColoringExample <vertex path> <edge path> "
-					+ "<result path> <max iterations>");
-		}
-		return true;
-	}
-
-	private static DataSet<Vertex<Long, Double>> getVertexDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			return env
-					.readCsvFile(vertexInputPath)
-					.fieldDelimiter(" ")
-					.lineDelimiter("\n")
-					.types(Long.class, Double.class)
-					.map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() {
-						@Override
-						public Vertex<Long, Double> map(Tuple2<Long, Double> value) throws Exception {
-							return new Vertex<Long, Double>(value.f0, value.f1);
-						}
-					});
-		}
-
-		return env.generateSequence(0, 5).map(new MapFunction<Long, Vertex<Long, Double>>() {
-			@Override
-			public Vertex<Long, Double> map(Long value) throws Exception {
-				return new Vertex<Long, Double>(value, (double) value);
-			}
-		});
-	}
-
-	private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			return env.readCsvFile(edgeInputPath)
-					.fieldDelimiter(" ")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class, Double.class)
-					.map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
-						@Override
-						public Edge<Long, Double> map(Tuple3<Long, Long, Double> value) throws Exception {
-							return new Edge<Long, Double>(value.f0, value.f1, value.f2);
-						}
-					});
-		}
-
-		return env.generateSequence(0, 5).flatMap(new FlatMapFunction<Long, Edge<Long, Double>>() {
-			@Override
-			public void flatMap(Long value, Collector<Edge<Long, Double>> out) throws Exception {
-				out.collect(new Edge<Long, Double>(value, (value + 1) % 6, 0.0));
-				out.collect(new Edge<Long, Double>(value, (value + 2) % 6, 0.0));
-			}
-		});
-	}
-
-	@Override
-	public String getDescription() {
-		return "GSA Greedy Graph Coloring";
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f56e9d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
index 9befccb..8d4b4d8 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
@@ -23,7 +23,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import java.io.Serializable;
 
 /**
- * A wrapper around Tuple3<VV, EV, VV> for convenience in the GatherFunction
+ * This class represents a <sourceVertex, edge> pair
+ * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
  * @param <VV> the vertex value type
  * @param <EV> the edge value type
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f56e9d/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index 321d530..0f5fe47 100755
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.graph.test;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
-import org.apache.flink.graph.example.GSAGreedyGraphColoringExample;
+import org.apache.flink.graph.example.GSAConnectedComponentsExample;
 import org.apache.flink.graph.example.GSASingleSourceShortestPathsExample;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.After;
@@ -68,17 +68,18 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	//  Greedy Graph Coloring Test
+	//  Connected Components Test
 	// --------------------------------------------------------------------------------------------
 
 	@Test
 	public void testGreedyGraphColoring() throws Exception {
-		GSAGreedyGraphColoringExample.main(new String[] {verticesPath, edgesPath, resultPath, "16"});
-		expectedResult = "1 1.0\n" +
-				"2 1.0\n" +
-				"3 1.0\n" +
-				"4 1.0\n" +
-				"5 1.0\n";
+		GSAConnectedComponentsExample.main(new String[]{verticesPath, edgesPath, resultPath, "16"});
+		expectedResult = "1 1\n" +
+				"2 1\n" +
+				"3 1\n" +
+				"4 1\n" +
+				"5 1\n" +
+				"6 6\n";
 
 	}
 
@@ -93,7 +94,8 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 				"2 12.0\n" +
 				"3 13.0\n" +
 				"4 47.0\n" +
-				"5 48.0\n";
+				"5 48.0\n" +
+				"6 Infinity\n";
 	}
 
 
@@ -101,11 +103,12 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 	//  Sample data
 	// --------------------------------------------------------------------------------------------
 
-	private static final String VERTICES = "1 1.0\n" +
-			"2 2.0\n" +
-			"3 3.0\n" +
-			"4 4.0\n" +
-			"5 5.0\n";
+	private static final String VERTICES = "1 1\n" +
+			"2 2\n" +
+			"3 3\n" +
+			"4 4\n" +
+			"5 5\n" +
+			"6 6\n";
 
 	private static final String EDGES = "1 2 12.0\n" +
 			"1 3 13.0\n" +