You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/04/26 14:01:05 UTC
[2/9] flink git commit: [FLINK-1514] [gelly] Removed GGC example,
added Connected Components instead
[FLINK-1514] [gelly] Removed GGC example, added Connected Components instead
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1f56e9d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1f56e9d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1f56e9d
Branch: refs/heads/master
Commit: e1f56e9d767347c5c69f943921cc0cac29708036
Parents: 722719f
Author: Dániel Bali <ba...@gmail.com>
Authored: Fri Mar 20 15:35:53 2015 +0100
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 19:52:23 2015 +0200
----------------------------------------------------------------------
.../example/GSAConnectedComponentsExample.java | 208 +++++++++++++++++
.../example/GSAGreedyGraphColoringExample.java | 224 -------------------
.../org/apache/flink/graph/gsa/RichEdge.java | 3 +-
.../flink/graph/test/GatherSumApplyITCase.java | 31 +--
4 files changed, 227 insertions(+), 239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e1f56e9d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
new file mode 100755
index 0000000..88938b5
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.GatherSumApplyIteration;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.RichEdge;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * This is an implementation of the connected components algorithm, using a gather-sum-apply iteration
+ */
+public class GSAConnectedComponentsExample implements ProgramDescription {
+
+ // --------------------------------------------------------------------------------------------
+ // Program
+ // --------------------------------------------------------------------------------------------
+
+ public static void main(String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
+ DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
+
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
+
+ // Simply return the vertex value of each vertex
+ GatherFunction<Long, NullValue, Long> gather = new ConnectedComponentsGather();
+
+ // Select the lower value among neighbors
+ SumFunction<Long, NullValue, Long> sum = new ConnectedComponentsSum();
+
+ // Set the lower value for each vertex
+ ApplyFunction<Long, NullValue, Long> apply = new ConnectedComponentsApply();
+
+ // Execute the GSA iteration
+ GatherSumApplyIteration<Long, Long, NullValue, Long> iteration =
+ graph.createGatherSumApplyIteration(gather, sum, apply, maxIterations);
+ Graph<Long, Long, NullValue> result = graph.runGatherSumApplyIteration(iteration);
+
+ // Extract the vertices as the result
+ DataSet<Vertex<Long, Long>> greedyGraphColoring = result.getVertices();
+
+ // emit result
+ if (fileOutput) {
+ greedyGraphColoring.writeAsCsv(outputPath, "\n", " ");
+ } else {
+ greedyGraphColoring.print();
+ }
+
+ env.execute("GSA Connected Components");
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Connected Components UDFs
+ // --------------------------------------------------------------------------------------------
+
+ private static final class ConnectedComponentsGather
+ extends GatherFunction<Long, NullValue, Long> {
+ @Override
+ public Long gather(RichEdge<Long, NullValue> richEdge) {
+
+ return richEdge.getSrcVertexValue();
+ }
+ };
+
+ private static final class ConnectedComponentsSum
+ extends SumFunction<Long, NullValue, Long> {
+ @Override
+ public Long sum(Long newValue, Long currentValue) {
+
+ return Math.min(newValue, currentValue);
+ }
+ };
+
+ private static final class ConnectedComponentsApply
+ extends ApplyFunction<Long, NullValue, Long> {
+ @Override
+ public void apply(Long summedValue, Long origValue) {
+
+ if (summedValue < origValue) {
+ setResult(summedValue);
+ }
+ }
+ };
+
+ // --------------------------------------------------------------------------------------------
+ // Util methods
+ // --------------------------------------------------------------------------------------------
+
+ private static boolean fileOutput = false;
+ private static String vertexInputPath = null;
+ private static String edgeInputPath = null;
+ private static String outputPath = null;
+
+ private static int maxIterations = 16;
+
+ private static boolean parseParameters(String[] args) {
+
+ if(args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+
+ if(args.length != 4) {
+ System.err.println("Usage: GSAConnectedComponentsExample <vertex path> <edge path> " +
+ "<result path> <max iterations>");
+ return false;
+ }
+
+ vertexInputPath = args[0];
+ edgeInputPath = args[1];
+ outputPath = args[2];
+ maxIterations = Integer.parseInt(args[3]);
+ } else {
+ System.out.println("Executing GSA Connected Components example with built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: GSAConnectedComponentsExample <vertex path> <edge path> "
+ + "<result path> <max iterations>");
+ }
+ return true;
+ }
+
+ private static DataSet<Vertex<Long, Long>> getVertexDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ return env
+ .readCsvFile(vertexInputPath)
+ .fieldDelimiter(" ")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class)
+ .map(new MapFunction<Tuple2<Long, Long>, Vertex<Long, Long>>() {
+ @Override
+ public Vertex<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
+ return new Vertex<Long, Long>(value.f0, value.f1);
+ }
+ });
+ }
+
+ return env.generateSequence(0, 5).map(new MapFunction<Long, Vertex<Long, Long>>() {
+ @Override
+ public Vertex<Long, Long> map(Long value) throws Exception {
+ return new Vertex<Long, Long>(value, value);
+ }
+ });
+ }
+
+ private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ return env.readCsvFile(edgeInputPath)
+ .fieldDelimiter(" ")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class)
+ .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+ @Override
+ public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
+ return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
+ }
+ });
+ }
+
+ // Generates 3 components of size 2
+ return env.generateSequence(0, 2).flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() {
+ @Override
+ public void flatMap(Long value, Collector<Edge<Long, NullValue>> out) throws Exception {
+ out.collect(new Edge<Long, NullValue>(value, value + 3, NullValue.getInstance()));
+ }
+ });
+ }
+
+ @Override
+ public String getDescription() {
+ return "GSA Greedy Graph Coloring";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1f56e9d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java
deleted file mode 100755
index 4acd86c..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.GatherSumApplyIteration;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.RichEdge;
-import org.apache.flink.util.Collector;
-
-import java.util.HashSet;
-
-/**
- * This is an implementation of the Greedy Graph Coloring algorithm, using a gather-sum-apply iteration
- */
-public class GSAGreedyGraphColoringExample implements ProgramDescription {
-
- // --------------------------------------------------------------------------------------------
- // Program
- // --------------------------------------------------------------------------------------------
-
- public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
- DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
-
- Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
-
- // Gather the target vertices into a one-element set
- GatherFunction<Double, Double, HashSet<Double>> gather = new GreedyGraphColoringGather();
-
- // Merge the sets between neighbors
- SumFunction<Double, Double, HashSet<Double>> sum = new GreedyGraphColoringSum();
-
- // Find the minimum vertex id in the set which will be propagated
- ApplyFunction<Double, Double, HashSet<Double>> apply = new GreedyGraphColoringApply();
-
- // Execute the GSA iteration
- GatherSumApplyIteration<Long, Double, Double, HashSet<Double>> iteration =
- graph.createGatherSumApplyIteration(gather, sum, apply, maxIterations);
- Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration(iteration);
-
- // Extract the vertices as the result
- DataSet<Vertex<Long, Double>> greedyGraphColoring = result.getVertices();
-
- // emit result
- if (fileOutput) {
- greedyGraphColoring.writeAsCsv(outputPath, "\n", " ");
- } else {
- greedyGraphColoring.print();
- }
-
- env.execute("GSA Greedy Graph Coloring");
- }
-
- // --------------------------------------------------------------------------------------------
- // Greedy Graph Coloring UDFs
- // --------------------------------------------------------------------------------------------
-
- private static final class GreedyGraphColoringGather
- extends GatherFunction<Double, Double, HashSet<Double>> {
- @Override
- public HashSet<Double> gather(RichEdge<Double, Double> richEdge) {
-
- HashSet<Double> result = new HashSet<Double>();
- result.add(richEdge.getSrcVertexValue());
-
- return result;
- }
- };
-
- private static final class GreedyGraphColoringSum
- extends SumFunction<Double, Double, HashSet<Double>> {
- @Override
- public HashSet<Double> sum(HashSet<Double> newValue, HashSet<Double> currentValue) {
-
- HashSet<Double> result = new HashSet<Double>();
- result.addAll(newValue);
- result.addAll(currentValue);
-
- return result;
- }
- };
-
- private static final class GreedyGraphColoringApply
- extends ApplyFunction<Double, Double, HashSet<Double>> {
- @Override
- public void apply(HashSet<Double> set, Double src) {
- double minValue = src;
- for (Double d : set) {
- if (d < minValue) {
- minValue = d;
- }
- }
-
- // This is the condition that enables the termination of the iteration
- if (minValue < src) {
- setResult(minValue);
- }
- }
- };
-
- // --------------------------------------------------------------------------------------------
- // Util methods
- // --------------------------------------------------------------------------------------------
-
- private static boolean fileOutput = false;
- private static String vertexInputPath = null;
- private static String edgeInputPath = null;
- private static String outputPath = null;
-
- private static int maxIterations = 16;
-
- private static boolean parseParameters(String[] args) {
-
- if(args.length > 0) {
- // parse input arguments
- fileOutput = true;
-
- if(args.length != 4) {
- System.err.println("Usage: GSAGreedyGraphColoringExample <vertex path> <edge path> " +
- "<result path> <max iterations>");
- return false;
- }
-
- vertexInputPath = args[0];
- edgeInputPath = args[1];
- outputPath = args[2];
- maxIterations = Integer.parseInt(args[3]);
- } else {
- System.out.println("Executing GSA Greedy Graph Coloring example with built-in default data.");
- System.out.println(" Provide parameters to read input data from files.");
- System.out.println(" See the documentation for the correct format of input files.");
- System.out.println(" Usage: GSAGreedyGraphColoringExample <vertex path> <edge path> "
- + "<result path> <max iterations>");
- }
- return true;
- }
-
- private static DataSet<Vertex<Long, Double>> getVertexDataSet(ExecutionEnvironment env) {
- if(fileOutput) {
- return env
- .readCsvFile(vertexInputPath)
- .fieldDelimiter(" ")
- .lineDelimiter("\n")
- .types(Long.class, Double.class)
- .map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() {
- @Override
- public Vertex<Long, Double> map(Tuple2<Long, Double> value) throws Exception {
- return new Vertex<Long, Double>(value.f0, value.f1);
- }
- });
- }
-
- return env.generateSequence(0, 5).map(new MapFunction<Long, Vertex<Long, Double>>() {
- @Override
- public Vertex<Long, Double> map(Long value) throws Exception {
- return new Vertex<Long, Double>(value, (double) value);
- }
- });
- }
-
- private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
- if(fileOutput) {
- return env.readCsvFile(edgeInputPath)
- .fieldDelimiter(" ")
- .lineDelimiter("\n")
- .types(Long.class, Long.class, Double.class)
- .map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
- @Override
- public Edge<Long, Double> map(Tuple3<Long, Long, Double> value) throws Exception {
- return new Edge<Long, Double>(value.f0, value.f1, value.f2);
- }
- });
- }
-
- return env.generateSequence(0, 5).flatMap(new FlatMapFunction<Long, Edge<Long, Double>>() {
- @Override
- public void flatMap(Long value, Collector<Edge<Long, Double>> out) throws Exception {
- out.collect(new Edge<Long, Double>(value, (value + 1) % 6, 0.0));
- out.collect(new Edge<Long, Double>(value, (value + 2) % 6, 0.0));
- }
- });
- }
-
- @Override
- public String getDescription() {
- return "GSA Greedy Graph Coloring";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1f56e9d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
index 9befccb..8d4b4d8 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
@@ -23,7 +23,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
import java.io.Serializable;
/**
- * A wrapper around Tuple3<VV, EV, VV> for convenience in the GatherFunction
+ * This class represents a <sourceVertex, edge> pair
+ * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
* @param <VV> the vertex value type
* @param <EV> the edge value type
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/e1f56e9d/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index 321d530..0f5fe47 100755
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.graph.test;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
-import org.apache.flink.graph.example.GSAGreedyGraphColoringExample;
+import org.apache.flink.graph.example.GSAConnectedComponentsExample;
import org.apache.flink.graph.example.GSASingleSourceShortestPathsExample;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.After;
@@ -68,17 +68,18 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
}
// --------------------------------------------------------------------------------------------
- // Greedy Graph Coloring Test
+ // Connected Components Test
// --------------------------------------------------------------------------------------------
@Test
public void testGreedyGraphColoring() throws Exception {
- GSAGreedyGraphColoringExample.main(new String[] {verticesPath, edgesPath, resultPath, "16"});
- expectedResult = "1 1.0\n" +
- "2 1.0\n" +
- "3 1.0\n" +
- "4 1.0\n" +
- "5 1.0\n";
+ GSAConnectedComponentsExample.main(new String[]{verticesPath, edgesPath, resultPath, "16"});
+ expectedResult = "1 1\n" +
+ "2 1\n" +
+ "3 1\n" +
+ "4 1\n" +
+ "5 1\n" +
+ "6 6\n";
}
@@ -93,7 +94,8 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
"2 12.0\n" +
"3 13.0\n" +
"4 47.0\n" +
- "5 48.0\n";
+ "5 48.0\n" +
+ "6 Infinity\n";
}
@@ -101,11 +103,12 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
// Sample data
// --------------------------------------------------------------------------------------------
- private static final String VERTICES = "1 1.0\n" +
- "2 2.0\n" +
- "3 3.0\n" +
- "4 4.0\n" +
- "5 5.0\n";
+ private static final String VERTICES = "1 1\n" +
+ "2 2\n" +
+ "3 3\n" +
+ "4 4\n" +
+ "5 5\n" +
+ "6 6\n";
private static final String EDGES = "1 2 12.0\n" +
"1 3 13.0\n" +