You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/04/26 14:01:04 UTC
[1/9] flink git commit: [FLINK-1514] [gelly] Unified create and run
for GSA iterations
Repository: flink
Updated Branches:
refs/heads/master b2aafe585 -> 6e24879b9
[FLINK-1514] [gelly] Unified create and run for GSA iterations
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/740d437a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/740d437a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/740d437a
Branch: refs/heads/master
Commit: 740d437a3d07f322d5883b840f21a01090ccea26
Parents: e1f56e9
Author: Dániel Bali <ba...@gmail.com>
Authored: Fri Apr 17 16:28:55 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 19:52:23 2015 +0200
----------------------------------------------------------------------
.../main/java/org/apache/flink/graph/Graph.java | 30 +++++++-
.../example/GSAConnectedComponentsExample.java | 64 ++++++----------
.../GSASingleSourceShortestPathsExample.java | 79 ++++++++------------
.../graph/gsa/GatherSumApplyIteration.java | 42 -----------
.../flink/graph/test/GatherSumApplyITCase.java | 25 ++-----
5 files changed, 91 insertions(+), 149 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/740d437a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 2a66aca..d564f24 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -43,6 +43,7 @@ import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -50,11 +51,11 @@ import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.GatherSumApplyIteration;
import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.spargel.IterationConfiguration;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.graph.utils.EdgeToTuple3Map;
-import org.apache.flink.graph.utils.GraphUtils;
import org.apache.flink.graph.utils.Tuple2ToVertexMap;
import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
import org.apache.flink.graph.utils.VertexToTuple2Map;
@@ -351,7 +352,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
@Override
public void join(Tuple4<K, K, VV, EV> tripletWithSrcValSet,
- Vertex<K, VV> vertex, Collector<Triplet<K, VV, EV>> collector) throws Exception {
+ Vertex<K, VV> vertex, Collector<Triplet<K, VV, EV>> collector) throws Exception {
collector.collect(new Triplet<K, VV, EV>(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1,
tripletWithSrcValSet.f2, vertex.getValue(), tripletWithSrcValSet.f3));
@@ -1205,6 +1206,31 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
}
+ /**
+ * Runs a Gather-Sum-Apply iteration on the graph.
+ * No configuration options are provided.
+ *
+ * @param gatherFunction the gather function collects information about adjacent vertices and edges
+ * @param sumFunction the sum function aggregates the gathered information
+ * @param applyFunction the apply function updates the vertex values with the aggregates
+ * @param maximumNumberOfIterations maximum number of iterations to perform
+ * @param <M> the intermediate type used between gather, sum and apply
+ *
+ * @return the updated Graph after the vertex-centric iteration has converged or
+ * after maximumNumberOfIterations.
+ */
+ public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
+ GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
+ ApplyFunction<VV, EV, M> applyFunction, int maximumNumberOfIterations) {
+
+ GatherSumApplyIteration<K, VV, EV, M> iteration = GatherSumApplyIteration.withEdges(
+ edges, gatherFunction, sumFunction, applyFunction, maximumNumberOfIterations);
+
+ DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
+
+ return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
+ }
+
public Graph<K, VV, EV> run(GraphAlgorithm<K, VV, EV> algorithm) throws Exception {
return algorithm.run(this);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/740d437a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
index 88938b5..d338b03 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -29,14 +29,13 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.GatherSumApplyIteration;
import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.graph.gsa.RichEdge;
import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector;
/**
- * This is an implementation of the connected components algorithm, using a gather-sum-apply iteration
+ * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration
*/
public class GSAConnectedComponentsExample implements ProgramDescription {
@@ -52,8 +51,8 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
+ DataSet<Vertex<Long, Long>> vertices = edges.flatMap(new InitVerticesMapper()).distinct();
Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
@@ -67,9 +66,8 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
ApplyFunction<Long, NullValue, Long> apply = new ConnectedComponentsApply();
// Execute the GSA iteration
- GatherSumApplyIteration<Long, Long, NullValue, Long> iteration =
- graph.createGatherSumApplyIteration(gather, sum, apply, maxIterations);
- Graph<Long, Long, NullValue> result = graph.runGatherSumApplyIteration(iteration);
+ Graph<Long, Long, NullValue> result =
+ graph.runGatherSumApplyIteration(gather, sum, apply, maxIterations);
// Extract the vertices as the result
DataSet<Vertex<Long, Long>> greedyGraphColoring = result.getVertices();
@@ -84,6 +82,16 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
env.execute("GSA Connected Components");
}
+ private static final class InitVerticesMapper
+ implements FlatMapFunction<Edge<Long, NullValue>, Vertex<Long, Long>>{
+
+ @Override
+ public void flatMap(Edge<Long, NullValue> edge, Collector<Vertex<Long, Long>> out) throws Exception {
+ out.collect(new Vertex<Long, Long>(edge.getSource(), edge.getSource()));
+ out.collect(new Vertex<Long, Long>(edge.getTarget(), edge.getTarget()));
+ }
+ }
+
// --------------------------------------------------------------------------------------------
// Connected Components UDFs
// --------------------------------------------------------------------------------------------
@@ -122,7 +130,6 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
// --------------------------------------------------------------------------------------------
private static boolean fileOutput = false;
- private static String vertexInputPath = null;
private static String edgeInputPath = null;
private static String outputPath = null;
@@ -130,55 +137,30 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
private static boolean parseParameters(String[] args) {
- if(args.length > 0) {
+ if (args.length > 0) {
// parse input arguments
fileOutput = true;
- if(args.length != 4) {
- System.err.println("Usage: GSAConnectedComponentsExample <vertex path> <edge path> " +
+ if (args.length != 3) {
+ System.err.println("Usage: GSAConnectedComponentsExample <edge path> " +
"<result path> <max iterations>");
return false;
}
- vertexInputPath = args[0];
- edgeInputPath = args[1];
- outputPath = args[2];
- maxIterations = Integer.parseInt(args[3]);
+ edgeInputPath = args[0];
+ outputPath = args[1];
+ maxIterations = Integer.parseInt(args[2]);
} else {
System.out.println("Executing GSA Connected Components example with built-in default data.");
System.out.println(" Provide parameters to read input data from files.");
System.out.println(" See the documentation for the correct format of input files.");
- System.out.println(" Usage: GSAConnectedComponentsExample <vertex path> <edge path> "
- + "<result path> <max iterations>");
+ System.out.println(" Usage: GSAConnectedComponentsExample <edge path> <result path> <max iterations>");
}
return true;
}
- private static DataSet<Vertex<Long, Long>> getVertexDataSet(ExecutionEnvironment env) {
- if(fileOutput) {
- return env
- .readCsvFile(vertexInputPath)
- .fieldDelimiter(" ")
- .lineDelimiter("\n")
- .types(Long.class, Long.class)
- .map(new MapFunction<Tuple2<Long, Long>, Vertex<Long, Long>>() {
- @Override
- public Vertex<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
- return new Vertex<Long, Long>(value.f0, value.f1);
- }
- });
- }
-
- return env.generateSequence(0, 5).map(new MapFunction<Long, Vertex<Long, Long>>() {
- @Override
- public Vertex<Long, Long> map(Long value) throws Exception {
- return new Vertex<Long, Long>(value, value);
- }
- });
- }
-
private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
- if(fileOutput) {
+ if (fileOutput) {
return env.readCsvFile(edgeInputPath)
.fieldDelimiter(" ")
.lineDelimiter("\n")
@@ -202,7 +184,7 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
@Override
public String getDescription() {
- return "GSA Greedy Graph Coloring";
+ return "GSA Connected Components";
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/740d437a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
index 9c8328b..8967a90 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -19,10 +19,10 @@
package org.apache.flink.graph.example;
import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
@@ -30,11 +30,9 @@ import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.GatherSumApplyIteration;
import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.graph.gsa.RichEdge;
-
-import java.io.Serializable;
+import org.apache.flink.util.Collector;
/**
* This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
@@ -53,8 +51,10 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
- DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
+ DataSet<Vertex<Long, Double>> vertices = edges
+ .flatMap(new InitVerticesMapper(srcVertexId))
+ .distinct();
Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
@@ -68,10 +68,8 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
ApplyFunction<Double, Double, Double> apply = new SingleSourceShortestPathApply();
// Execute the GSA iteration
- GatherSumApplyIteration<Long, Double, Double, Double> iteration = graph.createGatherSumApplyIteration(
- gather, sum, apply, maxIterations);
- Graph<Long, Double, Double> result = graph.mapVertices(new InitVerticesMapper<Long>(srcVertexId))
- .runGatherSumApplyIteration(iteration);
+ Graph<Long, Double, Double> result = graph
+ .runGatherSumApplyIteration(gather, sum, apply, maxIterations);
// Extract the vertices as the result
DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
@@ -86,20 +84,27 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
env.execute("GSA Single Source Shortest Paths Example");
}
- public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
- implements MapFunction<Vertex<K, Double>, Double> {
+ private static final class InitVerticesMapper
+ implements FlatMapFunction<Edge<Long, Double>, Vertex<Long, Double>>{
- private K srcVertexId;
+ private long srcVertexId;
- public InitVerticesMapper(K srcId) {
+ public InitVerticesMapper(long srcId) {
this.srcVertexId = srcId;
}
- public Double map(Vertex<K, Double> value) {
- if (value.f0.equals(srcVertexId)) {
- return 0.0;
+ @Override
+ public void flatMap(Edge<Long, Double> edge, Collector<Vertex<Long, Double>> out) throws Exception {
+ if (edge.getSource() == srcVertexId) {
+ out.collect(new Vertex<Long, Double>(edge.getSource(), 0.0));
+ } else {
+ out.collect(new Vertex<Long, Double>(edge.getSource(), Double.POSITIVE_INFINITY));
+ }
+
+ if (edge.getTarget() == srcVertexId) {
+ out.collect(new Vertex<Long, Double>(edge.getTarget(), 0.0));
} else {
- return Double.POSITIVE_INFINITY;
+ out.collect(new Vertex<Long, Double>(edge.getTarget(), Double.POSITIVE_INFINITY));
}
}
}
@@ -139,7 +144,6 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
// --------------------------------------------------------------------------------------------
private static boolean fileOutput = false;
- private static String vertexInputPath = null;
private static String edgeInputPath = null;
private static String outputPath = null;
@@ -148,51 +152,32 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
private static boolean parseParameters(String[] args) {
- if(args.length > 0) {
+ if (args.length > 0) {
// parse input arguments
fileOutput = true;
- if(args.length != 5) {
- System.err.println("Usage: GSASingleSourceShortestPathsExample <vertex path> <edge path> " +
+ if (args.length != 4) {
+ System.err.println("Usage: GSASingleSourceShortestPathsExample <edge path> " +
"<result path> <src vertex> <max iterations>");
return false;
}
- vertexInputPath = args[0];
- edgeInputPath = args[1];
- outputPath = args[2];
- srcVertexId = Long.parseLong(args[3]);
- maxIterations = Integer.parseInt(args[4]);
+ edgeInputPath = args[0];
+ outputPath = args[1];
+ srcVertexId = Long.parseLong(args[2]);
+ maxIterations = Integer.parseInt(args[3]);
} else {
System.out.println("Executing GSA Single Source Shortest Paths example with built-in default data.");
System.out.println(" Provide parameters to read input data from files.");
System.out.println(" See the documentation for the correct format of input files.");
- System.out.println(" Usage: GSASingleSourceShortestPathsExample <vertex path> <edge path> "
- + "<result path> <src vertex> <max iterations>");
+ System.out.println(" Usage: GSASingleSourceShortestPathsExample <edge path> <result path> <src vertex> "
+ + "<max iterations>");
}
return true;
}
- private static DataSet<Vertex<Long, Double>> getVertexDataSet(ExecutionEnvironment env) {
- if(fileOutput) {
- return env
- .readCsvFile(vertexInputPath)
- .fieldDelimiter(" ")
- .lineDelimiter("\n")
- .types(Long.class, Double.class)
- .map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() {
- @Override
- public Vertex<Long, Double> map(Tuple2<Long, Double> value) throws Exception {
- return new Vertex<Long, Double>(value.f0, value.f1);
- }
- });
- } else {
- return SingleSourceShortestPathsData.getDefaultVertexDataSet(env);
- }
- }
-
private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
- if(fileOutput) {
+ if (fileOutput) {
return env.readCsvFile(edgeInputPath)
.fieldDelimiter(" ")
.lineDelimiter("\n")
http://git-wip-us.apache.org/repos/asf/flink/blob/740d437a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 426efbb..1adab29 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -57,9 +57,6 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
private final ApplyFunction<VV, EV, M> apply;
private final int maximumNumberOfIterations;
- private String name;
- private int parallelism = -1;
-
// ----------------------------------------------------------------------------------
private GatherSumApplyIteration(GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum,
@@ -78,45 +75,6 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
this.maximumNumberOfIterations = maximumNumberOfIterations;
}
-
- /**
- * Sets the name for the gather-sum-apply iteration. The name is displayed in logs and messages.
- *
- * @param name The name for the iteration.
- */
- public void setName(String name) {
- this.name = name;
- }
-
- /**
- * Gets the name from this gather-sum-apply iteration.
- *
- * @return The name of the iteration.
- */
- public String getName() {
- return name;
- }
-
- /**
- * Sets the degree of parallelism for the iteration.
- *
- * @param parallelism The degree of parallelism.
- */
- public void setParallelism(int parallelism) {
- Validate.isTrue(parallelism > 0 || parallelism == -1,
- "The degree of parallelism must be positive, or -1 (use default).");
- this.parallelism = parallelism;
- }
-
- /**
- * Gets the iteration's degree of parallelism.
- *
- * @return The iterations parallelism, or -1, if not set.
- */
- public int getParallelism() {
- return parallelism;
- }
-
// --------------------------------------------------------------------------------------------
// Custom Operator behavior
// --------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/740d437a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index 0f5fe47..43377d6 100755
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -40,7 +40,6 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
super(mode);
}
- private String verticesPath;
private String edgesPath;
private String resultPath;
private String expectedResult;
@@ -51,13 +50,10 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
@Before
public void before() throws Exception{
resultPath = tempFolder.newFile().toURI().toString();
- File verticesFile = tempFolder.newFile();
- Files.write(GatherSumApplyITCase.VERTICES, verticesFile, Charsets.UTF_8);
File edgesFile = tempFolder.newFile();
Files.write(GatherSumApplyITCase.EDGES, edgesFile, Charsets.UTF_8);
- verticesPath = verticesFile.toURI().toString();
edgesPath = edgesFile.toURI().toString();
}
@@ -73,13 +69,14 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
@Test
public void testGreedyGraphColoring() throws Exception {
- GSAConnectedComponentsExample.main(new String[]{verticesPath, edgesPath, resultPath, "16"});
+ GSAConnectedComponentsExample.main(new String[]{edgesPath, resultPath, "16"});
expectedResult = "1 1\n" +
"2 1\n" +
"3 1\n" +
"4 1\n" +
"5 1\n" +
- "6 6\n";
+ "6 6\n" +
+ "7 6\n";
}
@@ -89,13 +86,14 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
@Test
public void testSingleSourceShortestPath() throws Exception {
- GSASingleSourceShortestPathsExample.main(new String[]{verticesPath, edgesPath, resultPath, "1", "16"});
+ GSASingleSourceShortestPathsExample.main(new String[]{edgesPath, resultPath, "1", "16"});
expectedResult = "1 0.0\n" +
"2 12.0\n" +
"3 13.0\n" +
"4 47.0\n" +
"5 48.0\n" +
- "6 Infinity\n";
+ "6 Infinity\n" +
+ "7 Infinity\n";
}
@@ -103,19 +101,12 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
// Sample data
// --------------------------------------------------------------------------------------------
- private static final String VERTICES = "1 1\n" +
- "2 2\n" +
- "3 3\n" +
- "4 4\n" +
- "5 5\n" +
- "6 6\n";
-
private static final String EDGES = "1 2 12.0\n" +
"1 3 13.0\n" +
"2 3 23.0\n" +
"3 4 34.0\n" +
"3 5 35.0\n" +
"4 5 45.0\n" +
- "5 1 51.0\n";
-
+ "5 1 51.0\n" +
+ "6 7 67.0\n";
}
[5/9] flink git commit: [FLINK-1514] [gelly] improvements to the GSA
SSSP example; improvements to the GSA Connected Components example
Posted by va...@apache.org.
[FLINK-1514] [gelly] improvements to the GSA SSSP example;
improvements to the GSA Connected Components example
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/921414d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/921414d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/921414d6
Branch: refs/heads/master
Commit: 921414d6e3582cb42820475ef22444b3eea26c96
Parents: 837508d
Author: vasia <va...@apache.org>
Authored: Tue Apr 21 01:06:13 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 20:00:11 2015 +0200
----------------------------------------------------------------------
.../example/GSAConnectedComponentsExample.java | 54 ++++-----
.../GSASingleSourceShortestPathsExample.java | 118 ++++++++-----------
2 files changed, 68 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/921414d6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
index 6a2c250..fca7b1d 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -52,22 +52,13 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
- DataSet<Vertex<Long, Long>> vertices = edges.flatMap(new InitVerticesMapper()).distinct();
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
-
- // Simply return the vertex value of each vertex
- GatherFunction<Long, NullValue, Long> gather = new ConnectedComponentsGather();
-
- // Select the lower value among neighbors
- SumFunction<Long, NullValue, Long> sum = new ConnectedComponentsSum();
-
- // Set the lower value for each vertex
- ApplyFunction<Long, NullValue, Long> apply = new ConnectedComponentsApply();
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
// Execute the GSA iteration
Graph<Long, Long, NullValue> result =
- graph.runGatherSumApplyIteration(gather, sum, apply, maxIterations);
+ graph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(),
+ new UpdateComponentId(), maxIterations);
// Extract the vertices as the result
DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices();
@@ -82,13 +73,11 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
env.execute("GSA Connected Components");
}
- private static final class InitVerticesMapper
- implements FlatMapFunction<Edge<Long, NullValue>, Vertex<Long, Long>>{
+ @SuppressWarnings("serial")
+ private static final class InitVertices implements MapFunction<Long, Long> {
- @Override
- public void flatMap(Edge<Long, NullValue> edge, Collector<Vertex<Long, Long>> out) throws Exception {
- out.collect(new Vertex<Long, Long>(edge.getSource(), edge.getSource()));
- out.collect(new Vertex<Long, Long>(edge.getTarget(), edge.getTarget()));
+ public Long map(Long vertexId) {
+ return vertexId;
}
}
@@ -96,29 +85,26 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
// Connected Components UDFs
// --------------------------------------------------------------------------------------------
- private static final class ConnectedComponentsGather
- extends GatherFunction<Long, NullValue, Long> {
- @Override
- public Long gather(Neighbor<Long, NullValue> richEdge) {
+ @SuppressWarnings("serial")
+ private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
- return richEdge.getSrcVertexValue();
+ public Long gather(Neighbor<Long, NullValue> neighbor) {
+ return neighbor.getSrcVertexValue();
}
};
- private static final class ConnectedComponentsSum
- extends SumFunction<Long, NullValue, Long> {
- @Override
- public Long sum(Long newValue, Long currentValue) {
+ @SuppressWarnings("serial")
+ private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
+ public Long sum(Long newValue, Long currentValue) {
return Math.min(newValue, currentValue);
}
};
- private static final class ConnectedComponentsApply
- extends ApplyFunction<Long, NullValue, Long> {
- @Override
- public void apply(Long summedValue, Long origValue) {
+ @SuppressWarnings("serial")
+ private static final class UpdateComponentId extends ApplyFunction<Long, NullValue, Long> {
+ public void apply(Long summedValue, Long origValue) {
if (summedValue < origValue) {
setResult(summedValue);
}
@@ -159,14 +145,15 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
return true;
}
+ @SuppressWarnings("serial")
private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
if (fileOutput) {
return env.readCsvFile(edgeInputPath)
- .fieldDelimiter(" ")
+ .fieldDelimiter("\t")
.lineDelimiter("\n")
.types(Long.class, Long.class)
.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
- @Override
+
public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
}
@@ -186,5 +173,4 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
public String getDescription() {
return "GSA Connected Components";
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/921414d6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
index cc3b054..488c66c 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -19,11 +19,9 @@
package org.apache.flink.graph.example;
import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
@@ -32,7 +30,7 @@ import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.util.Collector;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
/**
* This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
@@ -52,24 +50,13 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
- DataSet<Vertex<Long, Double>> vertices = edges
- .flatMap(new InitVerticesMapper(srcVertexId))
- .distinct();
- Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
-
- // The path from src to trg through edge e costs src + e
- GatherFunction<Double, Double, Double> gather = new SingleSourceShortestPathGather();
-
- // Return the smaller path length to minimize distance
- SumFunction<Double, Double, Double> sum = new SingleSourceShortestPathSum();
-
- // Iterate as long as the distance is updated
- ApplyFunction<Double, Double, Double> apply = new SingleSourceShortestPathApply();
+ Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
// Execute the GSA iteration
Graph<Long, Double, Double> result = graph
- .runGatherSumApplyIteration(gather, sum, apply, maxIterations);
+ .runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
+ new UpdateDistance(), maxIterations);
// Extract the vertices as the result
DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
@@ -84,27 +71,21 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
env.execute("GSA Single Source Shortest Paths Example");
}
- private static final class InitVerticesMapper
- implements FlatMapFunction<Edge<Long, Double>, Vertex<Long, Double>>{
+ @SuppressWarnings("serial")
+ private static final class InitVertices implements MapFunction<Long, Double>{
- private long srcVertexId;
+ private long srcId;
- public InitVerticesMapper(long srcId) {
- this.srcVertexId = srcId;
+ public InitVertices(long srcId) {
+ this.srcId = srcId;
}
- @Override
- public void flatMap(Edge<Long, Double> edge, Collector<Vertex<Long, Double>> out) throws Exception {
- if (edge.getSource() == srcVertexId) {
- out.collect(new Vertex<Long, Double>(edge.getSource(), 0.0));
- } else {
- out.collect(new Vertex<Long, Double>(edge.getSource(), Double.POSITIVE_INFINITY));
+ public Double map(Long id) {
+ if (id.equals(srcId)) {
+ return 0.0;
}
-
- if (edge.getTarget() == srcVertexId) {
- out.collect(new Vertex<Long, Double>(edge.getTarget(), 0.0));
- } else {
- out.collect(new Vertex<Long, Double>(edge.getTarget(), Double.POSITIVE_INFINITY));
+ else {
+ return Double.MAX_VALUE;
}
}
}
@@ -113,28 +94,28 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
// Single Source Shortest Path UDFs
// --------------------------------------------------------------------------------------------
- private static final class SingleSourceShortestPathGather
- extends GatherFunction<Double, Double, Double> {
- @Override
+ @SuppressWarnings("serial")
+ private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
+
public Double gather(Neighbor<Double, Double> richEdge) {
return richEdge.getSrcVertexValue() + richEdge.getEdgeValue();
}
};
- private static final class SingleSourceShortestPathSum
- extends SumFunction<Double, Double, Double> {
- @Override
+ @SuppressWarnings("serial")
+ private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
+
public Double sum(Double newValue, Double currentValue) {
return Math.min(newValue, currentValue);
}
};
- private static final class SingleSourceShortestPathApply
- extends ApplyFunction<Double, Double, Double> {
- @Override
- public void apply(Double summed, Double target) {
- if (summed < target) {
- setResult(summed);
+ @SuppressWarnings("serial")
+ private static final class UpdateDistance extends ApplyFunction<Double, Double, Double> {
+
+ public void apply(Double newDistance, Double oldDistance) {
+ if (newDistance < oldDistance) {
+ setResult(newDistance);
}
}
};
@@ -144,50 +125,47 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
// --------------------------------------------------------------------------------------------
private static boolean fileOutput = false;
- private static String edgeInputPath = null;
+
+ private static Long srcVertexId = 1l;
+
+ private static String edgesInputPath = null;
+
private static String outputPath = null;
- private static int maxIterations = 2;
- private static long srcVertexId = 1;
+ private static int maxIterations = 5;
private static boolean parseParameters(String[] args) {
if (args.length > 0) {
- // parse input arguments
- fileOutput = true;
-
- if (args.length != 4) {
- System.err.println("Usage: GSASingleSourceShortestPathsExample <edge path> " +
- "<result path> <src vertex> <max iterations>");
+ if(args.length != 4) {
+ System.err.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
+ " <input edges path> <output path> <num iterations>");
return false;
}
- edgeInputPath = args[0];
- outputPath = args[1];
- srcVertexId = Long.parseLong(args[2]);
+ fileOutput = true;
+ srcVertexId = Long.parseLong(args[0]);
+ edgesInputPath = args[1];
+ outputPath = args[2];
maxIterations = Integer.parseInt(args[3]);
} else {
- System.out.println("Executing GSA Single Source Shortest Paths example with built-in default data.");
- System.out.println(" Provide parameters to read input data from files.");
- System.out.println(" See the documentation for the correct format of input files.");
- System.out.println(" Usage: GSASingleSourceShortestPathsExample <edge path> <result path> <src vertex> "
- + "<max iterations>");
+ System.out.println("Executing GSASingle Source Shortest Paths example "
+ + "with default parameters and built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
+ " <input edges path> <output path> <num iterations>");
}
return true;
}
private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
if (fileOutput) {
- return env.readCsvFile(edgeInputPath)
- .fieldDelimiter(" ")
+ return env.readCsvFile(edgesInputPath)
+ .fieldDelimiter("\t")
.lineDelimiter("\n")
.types(Long.class, Long.class, Double.class)
- .map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
- @Override
- public Edge<Long, Double> map(Tuple3<Long, Long, Double> value) throws Exception {
- return new Edge<Long, Double>(value.f0, value.f1, value.f2);
- }
- });
+ .map(new Tuple3ToEdgeMap<Long, Double>());
} else {
return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
}
[8/9] flink git commit: [FLINK-1514] [gelly] Simplified GatherUdf;
added forwarded fields annotations
Posted by va...@apache.org.
[FLINK-1514] [gelly] Simplified GatherUdf; added forwarded fields annotations
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40f5f3a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40f5f3a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40f5f3a0
Branch: refs/heads/master
Commit: 40f5f3a0160df708ecc78356853138e640b59da5
Parents: 66d72ac
Author: vasia <va...@apache.org>
Authored: Thu Apr 23 21:56:56 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 20:00:59 2015 +0200
----------------------------------------------------------------------
.../graph/gsa/GatherSumApplyIteration.java | 36 +++++++++++++-------
1 file changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/40f5f3a0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 992f840..7fcd427 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -19,11 +19,14 @@
package org.apache.flink.graph.gsa;
import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.RichFlatJoinFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.functions.RichReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
import org.apache.flink.api.java.operators.CustomUnaryOperation;
import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.operators.JoinOperator;
@@ -119,11 +122,9 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
vertexDataSet.iterateDelta(vertexDataSet, maximumNumberOfIterations, zeroKeyPos);
// Prepare the neighbors
- DataSet<Tuple2<Vertex<K, VV>, Edge<K, EV>>> neighbors = iteration
- .getWorkset()
- .join(edgeDataSet)
- .where(0)
- .equalTo(0);
+ DataSet<Tuple2<K, Neighbor<VV, EV>>> neighbors = iteration
+ .getWorkset().join(edgeDataSet)
+ .where(0).equalTo(0).with(new ProjectKeyWithNeighbor<K, VV, EV>());
// Gather, sum and apply
DataSet<Tuple2<K, M>> gatheredSet = neighbors.map(gatherUdf);
@@ -170,8 +171,9 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
// --------------------------------------------------------------------------------------------
@SuppressWarnings("serial")
+ @ForwardedFields("f0")
private static final class GatherUdf<K extends Comparable<K> & Serializable, VV extends Serializable,
- EV extends Serializable, M> extends RichMapFunction<Tuple2<Vertex<K, VV>, Edge<K, EV>>,
+ EV extends Serializable, M> extends RichMapFunction<Tuple2<K, Neighbor<VV, EV>>,
Tuple2<K, M>> implements ResultTypeQueryable<Tuple2<K, M>> {
private final GatherFunction<VV, EV, M> gatherFunction;
@@ -183,13 +185,9 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
}
@Override
- public Tuple2<K, M> map(Tuple2<Vertex<K, VV>, Edge<K, EV>> neighborTuple) throws Exception {
- Neighbor<VV, EV> neighbor = new Neighbor<VV, EV>(neighborTuple.f0.getValue(),
- neighborTuple.f1.getValue());
-
- K key = neighborTuple.f1.getTarget();
- M result = this.gatherFunction.gather(neighbor);
- return new Tuple2<K, M>(key, result);
+ public Tuple2<K, M> map(Tuple2<K, Neighbor<VV, EV>> neighborTuple) {
+ M result = this.gatherFunction.gather(neighborTuple.f1);
+ return new Tuple2<K, M>(neighborTuple.f0, result);
}
@Override
@@ -302,4 +300,16 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
}
}
+ @SuppressWarnings("serial")
+ @ForwardedFieldsSecond("f1->f0")
+ private static final class ProjectKeyWithNeighbor<K extends Comparable<K> & Serializable,
+ VV extends Serializable, EV extends Serializable> implements FlatJoinFunction<
+ Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
+
+ public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
+ out.collect(new Tuple2<K, Neighbor<VV, EV>>(
+ edge.getTarget(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
+ }
+ }
+
}
[2/9] flink git commit: [FLINK-1514] [gelly] Removed GGC example,
added Connected Components instead
Posted by va...@apache.org.
[FLINK-1514] [gelly] Removed GGC example, added Connected Components instead
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1f56e9d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1f56e9d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1f56e9d
Branch: refs/heads/master
Commit: e1f56e9d767347c5c69f943921cc0cac29708036
Parents: 722719f
Author: Dániel Bali <ba...@gmail.com>
Authored: Fri Mar 20 15:35:53 2015 +0100
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 19:52:23 2015 +0200
----------------------------------------------------------------------
.../example/GSAConnectedComponentsExample.java | 208 +++++++++++++++++
.../example/GSAGreedyGraphColoringExample.java | 224 -------------------
.../org/apache/flink/graph/gsa/RichEdge.java | 3 +-
.../flink/graph/test/GatherSumApplyITCase.java | 31 +--
4 files changed, 227 insertions(+), 239 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/e1f56e9d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
new file mode 100755
index 0000000..88938b5
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.GatherSumApplyIteration;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.RichEdge;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * This is an implementation of the connected components algorithm, using a gather-sum-apply iteration
+ */
+public class GSAConnectedComponentsExample implements ProgramDescription {
+
+ // --------------------------------------------------------------------------------------------
+ // Program
+ // --------------------------------------------------------------------------------------------
+
+ public static void main(String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
+ DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
+
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
+
+ // Simply return the vertex value of each vertex
+ GatherFunction<Long, NullValue, Long> gather = new ConnectedComponentsGather();
+
+ // Select the lower value among neighbors
+ SumFunction<Long, NullValue, Long> sum = new ConnectedComponentsSum();
+
+ // Set the lower value for each vertex
+ ApplyFunction<Long, NullValue, Long> apply = new ConnectedComponentsApply();
+
+ // Execute the GSA iteration
+ GatherSumApplyIteration<Long, Long, NullValue, Long> iteration =
+ graph.createGatherSumApplyIteration(gather, sum, apply, maxIterations);
+ Graph<Long, Long, NullValue> result = graph.runGatherSumApplyIteration(iteration);
+
+ // Extract the vertices as the result
+ DataSet<Vertex<Long, Long>> greedyGraphColoring = result.getVertices();
+
+ // emit result
+ if (fileOutput) {
+ greedyGraphColoring.writeAsCsv(outputPath, "\n", " ");
+ } else {
+ greedyGraphColoring.print();
+ }
+
+ env.execute("GSA Connected Components");
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Connected Components UDFs
+ // --------------------------------------------------------------------------------------------
+
+ private static final class ConnectedComponentsGather
+ extends GatherFunction<Long, NullValue, Long> {
+ @Override
+ public Long gather(RichEdge<Long, NullValue> richEdge) {
+
+ return richEdge.getSrcVertexValue();
+ }
+ };
+
+ private static final class ConnectedComponentsSum
+ extends SumFunction<Long, NullValue, Long> {
+ @Override
+ public Long sum(Long newValue, Long currentValue) {
+
+ return Math.min(newValue, currentValue);
+ }
+ };
+
+ private static final class ConnectedComponentsApply
+ extends ApplyFunction<Long, NullValue, Long> {
+ @Override
+ public void apply(Long summedValue, Long origValue) {
+
+ if (summedValue < origValue) {
+ setResult(summedValue);
+ }
+ }
+ };
+
+ // --------------------------------------------------------------------------------------------
+ // Util methods
+ // --------------------------------------------------------------------------------------------
+
+ private static boolean fileOutput = false;
+ private static String vertexInputPath = null;
+ private static String edgeInputPath = null;
+ private static String outputPath = null;
+
+ private static int maxIterations = 16;
+
+ private static boolean parseParameters(String[] args) {
+
+ if(args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+
+ if(args.length != 4) {
+ System.err.println("Usage: GSAConnectedComponentsExample <vertex path> <edge path> " +
+ "<result path> <max iterations>");
+ return false;
+ }
+
+ vertexInputPath = args[0];
+ edgeInputPath = args[1];
+ outputPath = args[2];
+ maxIterations = Integer.parseInt(args[3]);
+ } else {
+ System.out.println("Executing GSA Connected Components example with built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: GSAConnectedComponentsExample <vertex path> <edge path> "
+ + "<result path> <max iterations>");
+ }
+ return true;
+ }
+
+ private static DataSet<Vertex<Long, Long>> getVertexDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ return env
+ .readCsvFile(vertexInputPath)
+ .fieldDelimiter(" ")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class)
+ .map(new MapFunction<Tuple2<Long, Long>, Vertex<Long, Long>>() {
+ @Override
+ public Vertex<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
+ return new Vertex<Long, Long>(value.f0, value.f1);
+ }
+ });
+ }
+
+ return env.generateSequence(0, 5).map(new MapFunction<Long, Vertex<Long, Long>>() {
+ @Override
+ public Vertex<Long, Long> map(Long value) throws Exception {
+ return new Vertex<Long, Long>(value, value);
+ }
+ });
+ }
+
+ private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ return env.readCsvFile(edgeInputPath)
+ .fieldDelimiter(" ")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class)
+ .map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+ @Override
+ public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
+ return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
+ }
+ });
+ }
+
+ // Generates 3 components of size 2
+ return env.generateSequence(0, 2).flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() {
+ @Override
+ public void flatMap(Long value, Collector<Edge<Long, NullValue>> out) throws Exception {
+ out.collect(new Edge<Long, NullValue>(value, value + 3, NullValue.getInstance()));
+ }
+ });
+ }
+
+ @Override
+ public String getDescription() {
+ return "GSA Greedy Graph Coloring";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1f56e9d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java
deleted file mode 100755
index 4acd86c..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.GatherSumApplyIteration;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.RichEdge;
-import org.apache.flink.util.Collector;
-
-import java.util.HashSet;
-
-/**
- * This is an implementation of the Greedy Graph Coloring algorithm, using a gather-sum-apply iteration
- */
-public class GSAGreedyGraphColoringExample implements ProgramDescription {
-
- // --------------------------------------------------------------------------------------------
- // Program
- // --------------------------------------------------------------------------------------------
-
- public static void main(String[] args) throws Exception {
-
- if (!parseParameters(args)) {
- return;
- }
-
- ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
- DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
-
- Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
-
- // Gather the target vertices into a one-element set
- GatherFunction<Double, Double, HashSet<Double>> gather = new GreedyGraphColoringGather();
-
- // Merge the sets between neighbors
- SumFunction<Double, Double, HashSet<Double>> sum = new GreedyGraphColoringSum();
-
- // Find the minimum vertex id in the set which will be propagated
- ApplyFunction<Double, Double, HashSet<Double>> apply = new GreedyGraphColoringApply();
-
- // Execute the GSA iteration
- GatherSumApplyIteration<Long, Double, Double, HashSet<Double>> iteration =
- graph.createGatherSumApplyIteration(gather, sum, apply, maxIterations);
- Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration(iteration);
-
- // Extract the vertices as the result
- DataSet<Vertex<Long, Double>> greedyGraphColoring = result.getVertices();
-
- // emit result
- if (fileOutput) {
- greedyGraphColoring.writeAsCsv(outputPath, "\n", " ");
- } else {
- greedyGraphColoring.print();
- }
-
- env.execute("GSA Greedy Graph Coloring");
- }
-
- // --------------------------------------------------------------------------------------------
- // Greedy Graph Coloring UDFs
- // --------------------------------------------------------------------------------------------
-
- private static final class GreedyGraphColoringGather
- extends GatherFunction<Double, Double, HashSet<Double>> {
- @Override
- public HashSet<Double> gather(RichEdge<Double, Double> richEdge) {
-
- HashSet<Double> result = new HashSet<Double>();
- result.add(richEdge.getSrcVertexValue());
-
- return result;
- }
- };
-
- private static final class GreedyGraphColoringSum
- extends SumFunction<Double, Double, HashSet<Double>> {
- @Override
- public HashSet<Double> sum(HashSet<Double> newValue, HashSet<Double> currentValue) {
-
- HashSet<Double> result = new HashSet<Double>();
- result.addAll(newValue);
- result.addAll(currentValue);
-
- return result;
- }
- };
-
- private static final class GreedyGraphColoringApply
- extends ApplyFunction<Double, Double, HashSet<Double>> {
- @Override
- public void apply(HashSet<Double> set, Double src) {
- double minValue = src;
- for (Double d : set) {
- if (d < minValue) {
- minValue = d;
- }
- }
-
- // This is the condition that enables the termination of the iteration
- if (minValue < src) {
- setResult(minValue);
- }
- }
- };
-
- // --------------------------------------------------------------------------------------------
- // Util methods
- // --------------------------------------------------------------------------------------------
-
- private static boolean fileOutput = false;
- private static String vertexInputPath = null;
- private static String edgeInputPath = null;
- private static String outputPath = null;
-
- private static int maxIterations = 16;
-
- private static boolean parseParameters(String[] args) {
-
- if(args.length > 0) {
- // parse input arguments
- fileOutput = true;
-
- if(args.length != 4) {
- System.err.println("Usage: GSAGreedyGraphColoringExample <vertex path> <edge path> " +
- "<result path> <max iterations>");
- return false;
- }
-
- vertexInputPath = args[0];
- edgeInputPath = args[1];
- outputPath = args[2];
- maxIterations = Integer.parseInt(args[3]);
- } else {
- System.out.println("Executing GSA Greedy Graph Coloring example with built-in default data.");
- System.out.println(" Provide parameters to read input data from files.");
- System.out.println(" See the documentation for the correct format of input files.");
- System.out.println(" Usage: GSAGreedyGraphColoringExample <vertex path> <edge path> "
- + "<result path> <max iterations>");
- }
- return true;
- }
-
- private static DataSet<Vertex<Long, Double>> getVertexDataSet(ExecutionEnvironment env) {
- if(fileOutput) {
- return env
- .readCsvFile(vertexInputPath)
- .fieldDelimiter(" ")
- .lineDelimiter("\n")
- .types(Long.class, Double.class)
- .map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() {
- @Override
- public Vertex<Long, Double> map(Tuple2<Long, Double> value) throws Exception {
- return new Vertex<Long, Double>(value.f0, value.f1);
- }
- });
- }
-
- return env.generateSequence(0, 5).map(new MapFunction<Long, Vertex<Long, Double>>() {
- @Override
- public Vertex<Long, Double> map(Long value) throws Exception {
- return new Vertex<Long, Double>(value, (double) value);
- }
- });
- }
-
- private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
- if(fileOutput) {
- return env.readCsvFile(edgeInputPath)
- .fieldDelimiter(" ")
- .lineDelimiter("\n")
- .types(Long.class, Long.class, Double.class)
- .map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
- @Override
- public Edge<Long, Double> map(Tuple3<Long, Long, Double> value) throws Exception {
- return new Edge<Long, Double>(value.f0, value.f1, value.f2);
- }
- });
- }
-
- return env.generateSequence(0, 5).flatMap(new FlatMapFunction<Long, Edge<Long, Double>>() {
- @Override
- public void flatMap(Long value, Collector<Edge<Long, Double>> out) throws Exception {
- out.collect(new Edge<Long, Double>(value, (value + 1) % 6, 0.0));
- out.collect(new Edge<Long, Double>(value, (value + 2) % 6, 0.0));
- }
- });
- }
-
- @Override
- public String getDescription() {
- return "GSA Greedy Graph Coloring";
- }
-
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/e1f56e9d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
index 9befccb..8d4b4d8 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
@@ -23,7 +23,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
import java.io.Serializable;
/**
- * A wrapper around Tuple3<VV, EV, VV> for convenience in the GatherFunction
+ * This class represents a <sourceVertex, edge> pair
+ * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
* @param <VV> the vertex value type
* @param <EV> the edge value type
*/
http://git-wip-us.apache.org/repos/asf/flink/blob/e1f56e9d/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index 321d530..0f5fe47 100755
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.graph.test;
import com.google.common.base.Charsets;
import com.google.common.io.Files;
-import org.apache.flink.graph.example.GSAGreedyGraphColoringExample;
+import org.apache.flink.graph.example.GSAConnectedComponentsExample;
import org.apache.flink.graph.example.GSASingleSourceShortestPathsExample;
import org.apache.flink.test.util.MultipleProgramsTestBase;
import org.junit.After;
@@ -68,17 +68,18 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
}
// --------------------------------------------------------------------------------------------
- // Greedy Graph Coloring Test
+ // Connected Components Test
// --------------------------------------------------------------------------------------------
@Test
public void testGreedyGraphColoring() throws Exception {
- GSAGreedyGraphColoringExample.main(new String[] {verticesPath, edgesPath, resultPath, "16"});
- expectedResult = "1 1.0\n" +
- "2 1.0\n" +
- "3 1.0\n" +
- "4 1.0\n" +
- "5 1.0\n";
+ GSAConnectedComponentsExample.main(new String[]{verticesPath, edgesPath, resultPath, "16"});
+ expectedResult = "1 1\n" +
+ "2 1\n" +
+ "3 1\n" +
+ "4 1\n" +
+ "5 1\n" +
+ "6 6\n";
}
@@ -93,7 +94,8 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
"2 12.0\n" +
"3 13.0\n" +
"4 47.0\n" +
- "5 48.0\n";
+ "5 48.0\n" +
+ "6 Infinity\n";
}
@@ -101,11 +103,12 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
// Sample data
// --------------------------------------------------------------------------------------------
- private static final String VERTICES = "1 1.0\n" +
- "2 2.0\n" +
- "3 3.0\n" +
- "4 4.0\n" +
- "5 5.0\n";
+ private static final String VERTICES = "1 1\n" +
+ "2 2\n" +
+ "3 3\n" +
+ "4 4\n" +
+ "5 5\n" +
+ "6 6\n";
private static final String EDGES = "1 2 12.0\n" +
"1 3 13.0\n" +
[7/9] flink git commit: [FLINK-1514] [gelly] suppressed warnings in
several places; adjusted tests
Posted by va...@apache.org.
[FLINK-1514] [gelly] suppressed warnings in several places; adjusted tests
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e3165ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e3165ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e3165ee
Branch: refs/heads/master
Commit: 4e3165eec508c320f8c4f1fef7afe78697a64fad
Parents: 921414d
Author: vasia <va...@apache.org>
Authored: Tue Apr 21 01:31:50 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 20:00:59 2015 +0200
----------------------------------------------------------------------
.../GSASingleSourceShortestPathsExample.java | 2 +-
.../apache/flink/graph/gsa/ApplyFunction.java | 2 ++
.../apache/flink/graph/gsa/GatherFunction.java | 2 ++
.../graph/gsa/GatherSumApplyIteration.java | 3 +++
.../org/apache/flink/graph/gsa/Neighbor.java | 1 +
.../org/apache/flink/graph/gsa/SumFunction.java | 2 ++
.../flink/graph/test/GatherSumApplyITCase.java | 20 ++++++++++----------
7 files changed, 21 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/4e3165ee/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
index 488c66c..b4f1f34 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -85,7 +85,7 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
return 0.0;
}
else {
- return Double.MAX_VALUE;
+ return Double.POSITIVE_INFINITY;
}
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/4e3165ee/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
index d863da1..ab64c92 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -23,6 +23,7 @@ import org.apache.flink.util.Collector;
import java.io.Serializable;
+@SuppressWarnings("serial")
public abstract class ApplyFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
public abstract void apply(M message, VV vertexValue);
@@ -54,6 +55,7 @@ public abstract class ApplyFunction<VV extends Serializable, EV extends Serializ
// Internal methods
// --------------------------------------------------------------------------------------------
+ @SuppressWarnings("unused")
private IterationRuntimeContext runtimeContext;
private Collector<VV> out;
http://git-wip-us.apache.org/repos/asf/flink/blob/4e3165ee/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
index 0d110d5..0b0caf9 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext;
import java.io.Serializable;
+@SuppressWarnings("serial")
public abstract class GatherFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
public abstract M gather(Neighbor<VV, EV> neighbor);
@@ -44,6 +45,7 @@ public abstract class GatherFunction<VV extends Serializable, EV extends Seriali
// Internal methods
// --------------------------------------------------------------------------------------------
+ @SuppressWarnings("unused")
private IterationRuntimeContext runtimeContext;
public void init(IterationRuntimeContext iterationRuntimeContext) {
http://git-wip-us.apache.org/repos/asf/flink/blob/4e3165ee/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 67ae094..53fa7a4 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -165,6 +165,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
// Wrapping UDFs
// --------------------------------------------------------------------------------------------
+ @SuppressWarnings("serial")
private static final class GatherUdf<K extends Comparable<K> & Serializable, VV extends Serializable,
EV extends Serializable, M> extends RichMapFunction<Tuple2<Vertex<K, VV>, Edge<K, EV>>,
Tuple2<K, M>> implements ResultTypeQueryable<Tuple2<K, M>> {
@@ -206,6 +207,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
}
}
+ @SuppressWarnings("serial")
private static final class SumUdf<K extends Comparable<K> & Serializable, VV extends Serializable,
EV extends Serializable, M> extends RichReduceFunction<Tuple2<K, M>>
implements ResultTypeQueryable<Tuple2<K, M>>{
@@ -244,6 +246,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
}
}
+ @SuppressWarnings("serial")
private static final class ApplyUdf<K extends Comparable<K> & Serializable,
VV extends Serializable, EV extends Serializable, M> extends RichFlatJoinFunction<Tuple2<K, M>,
Vertex<K, VV>, Vertex<K, VV>> implements ResultTypeQueryable<Vertex<K, VV>> {
http://git-wip-us.apache.org/repos/asf/flink/blob/4e3165ee/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
index 2260022..39f18d5 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
@@ -28,6 +28,7 @@ import java.io.Serializable;
* @param <VV> the vertex value type
* @param <EV> the edge value type
*/
+@SuppressWarnings("serial")
public class Neighbor<VV extends Serializable, EV extends Serializable>
extends Tuple2<VV, EV> {
http://git-wip-us.apache.org/repos/asf/flink/blob/4e3165ee/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
index b616194..6ed85c4 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext;
import java.io.Serializable;
+@SuppressWarnings("serial")
public abstract class SumFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
public abstract M sum(M arg0, M arg1);
@@ -44,6 +45,7 @@ public abstract class SumFunction<VV extends Serializable, EV extends Serializab
// Internal methods
// --------------------------------------------------------------------------------------------
+ @SuppressWarnings("unused")
private IterationRuntimeContext runtimeContext;
public void init(IterationRuntimeContext iterationRuntimeContext) {
http://git-wip-us.apache.org/repos/asf/flink/blob/4e3165ee/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index 43377d6..23ebfa5 100755
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -68,7 +68,7 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
// --------------------------------------------------------------------------------------------
@Test
- public void testGreedyGraphColoring() throws Exception {
+ public void testConnectedComponents() throws Exception {
GSAConnectedComponentsExample.main(new String[]{edgesPath, resultPath, "16"});
expectedResult = "1 1\n" +
"2 1\n" +
@@ -86,7 +86,7 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
@Test
public void testSingleSourceShortestPath() throws Exception {
- GSASingleSourceShortestPathsExample.main(new String[]{edgesPath, resultPath, "1", "16"});
+ GSASingleSourceShortestPathsExample.main(new String[]{"1", edgesPath, resultPath, "16"});
expectedResult = "1 0.0\n" +
"2 12.0\n" +
"3 13.0\n" +
@@ -101,12 +101,12 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
// Sample data
// --------------------------------------------------------------------------------------------
- private static final String EDGES = "1 2 12.0\n" +
- "1 3 13.0\n" +
- "2 3 23.0\n" +
- "3 4 34.0\n" +
- "3 5 35.0\n" +
- "4 5 45.0\n" +
- "5 1 51.0\n" +
- "6 7 67.0\n";
+ private static final String EDGES = "1 2 12.0\n" +
+ "1 3 13.0\n" +
+ "2 3 23.0\n" +
+ "3 4 34.0\n" +
+ "3 5 35.0\n" +
+ "4 5 45.0\n" +
+ "5 1 51.0\n" +
+ "6 7 67.0\n";
}
[9/9] flink git commit: [FLINK-1514] [gelly] Removed edge value type
from ApplyFunction; Reuse the output vertex
Posted by va...@apache.org.
[FLINK-1514] [gelly] Removed edge value type from ApplyFunction; Reuse the output vertex
This closes #408
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e24879b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e24879b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e24879b
Branch: refs/heads/master
Commit: 6e24879b96bcd5eee9f1b4af504eced11a901123
Parents: 40f5f3a
Author: vasia <va...@apache.org>
Authored: Fri Apr 24 00:16:37 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sun Apr 26 13:26:02 2015 +0200
----------------------------------------------------------------------
flink-staging/flink-gelly/pom.xml | 0
.../main/java/org/apache/flink/graph/Graph.java | 4 +--
.../example/GSAConnectedComponentsExample.java | 2 +-
.../GSASingleSourceShortestPathsExample.java | 2 +-
.../apache/flink/graph/gsa/ApplyFunction.java | 16 ++++++----
.../graph/gsa/GatherSumApplyIteration.java | 31 ++++++--------------
6 files changed, 24 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/pom.xml b/flink-staging/flink-gelly/pom.xml
old mode 100755
new mode 100644
http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 330c951..f843827 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -214,7 +214,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
return new Vertex<K, VV>(value.f0, mapper.map(value.f0));
}
- }).returns(returnType);
+ }).returns(returnType).withForwardedFields("f0");
return new Graph<K, VV, EV>(vertices, edges, context);
}
@@ -1213,7 +1213,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
*/
public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
- ApplyFunction<VV, EV, M> applyFunction, int maximumNumberOfIterations) {
+ ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations) {
GatherSumApplyIteration<K, VV, EV, M> iteration = GatherSumApplyIteration.withEdges(
edges, gatherFunction, sumFunction, applyFunction, maximumNumberOfIterations);
http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
index dba28cd..7c39123 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -102,7 +102,7 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
};
@SuppressWarnings("serial")
- private static final class UpdateComponentId extends ApplyFunction<Long, NullValue, Long> {
+ private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
public void apply(Long summedValue, Long origValue) {
if (summedValue < origValue) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
index 166a3d3..75cbd78 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -111,7 +111,7 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
};
@SuppressWarnings("serial")
- private static final class UpdateDistance extends ApplyFunction<Double, Double, Double> {
+ private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
public void apply(Double newDistance, Double oldDistance) {
if (newDistance < oldDistance) {
http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
index ab64c92..75f64f9 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -19,14 +19,16 @@
package org.apache.flink.graph.gsa;
import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.graph.Vertex;
import org.apache.flink.util.Collector;
import java.io.Serializable;
@SuppressWarnings("serial")
-public abstract class ApplyFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
+public abstract class ApplyFunction<K extends Comparable<K> & Serializable, VV extends Serializable, M>
+ implements Serializable {
- public abstract void apply(M message, VV vertexValue);
+ public abstract void apply(M newValue, VV currentValue);
/**
* Sets the result for the apply function
@@ -34,7 +36,8 @@ public abstract class ApplyFunction<VV extends Serializable, EV extends Serializ
* @param result the result of the apply phase
*/
public void setResult(VV result) {
- out.collect(result);
+ outVal.f1 = result;
+ out.collect(outVal);
}
/**
@@ -58,14 +61,17 @@ public abstract class ApplyFunction<VV extends Serializable, EV extends Serializ
@SuppressWarnings("unused")
private IterationRuntimeContext runtimeContext;
- private Collector<VV> out;
+ private Collector<Vertex<K, VV>> out;
+
+ private Vertex<K, VV> outVal;
public void init(IterationRuntimeContext iterationRuntimeContext) {
this.runtimeContext = iterationRuntimeContext;
};
- public void setOutput(Collector<VV> out) {
+ public void setOutput(Vertex<K, VV> vertex, Collector<Vertex<K, VV>> out) {
this.out = out;
+ this.outVal = vertex;
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 7fcd427..22be591 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -58,13 +58,13 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
private final GatherFunction<VV, EV, M> gather;
private final SumFunction<VV, EV, M> sum;
- private final ApplyFunction<VV, EV, M> apply;
+ private final ApplyFunction<K, VV, M> apply;
private final int maximumNumberOfIterations;
// ----------------------------------------------------------------------------------
private GatherSumApplyIteration(GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum,
- ApplyFunction<VV, EV, M> apply, DataSet<Edge<K, EV>> edges, int maximumNumberOfIterations) {
+ ApplyFunction<K, VV, M> apply, DataSet<Edge<K, EV>> edges, int maximumNumberOfIterations) {
Validate.notNull(gather);
Validate.notNull(sum);
@@ -161,7 +161,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
*/
public static final <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, M>
GatherSumApplyIteration<K, VV, EV, M> withEdges(DataSet<Edge<K, EV>> edges,
- GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum, ApplyFunction<VV, EV, M> apply,
+ GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply,
int maximumNumberOfIterations) {
return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations);
}
@@ -253,32 +253,19 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
VV extends Serializable, EV extends Serializable, M> extends RichFlatJoinFunction<Tuple2<K, M>,
Vertex<K, VV>, Vertex<K, VV>> implements ResultTypeQueryable<Vertex<K, VV>> {
- private final ApplyFunction<VV, EV, M> applyFunction;
+ private final ApplyFunction<K, VV, M> applyFunction;
private transient TypeInformation<Vertex<K, VV>> resultType;
- private ApplyUdf(ApplyFunction<VV, EV, M> applyFunction, TypeInformation<Vertex<K, VV>> resultType) {
+ private ApplyUdf(ApplyFunction<K, VV, M> applyFunction, TypeInformation<Vertex<K, VV>> resultType) {
this.applyFunction = applyFunction;
this.resultType = resultType;
}
@Override
- public void join(Tuple2<K, M> arg0, Vertex<K, VV> arg1, final Collector<Vertex<K, VV>> out) throws Exception {
-
- final K key = arg1.getId();
- Collector<VV> userOut = new Collector<VV>() {
- @Override
- public void collect(VV record) {
- out.collect(new Vertex<K, VV>(key, record));
- }
-
- @Override
- public void close() {
- out.close();
- }
- };
-
- this.applyFunction.setOutput(userOut);
- this.applyFunction.apply(arg0.f1, arg1.getValue());
+ public void join(Tuple2<K, M> newValue, final Vertex<K, VV> currentValue, final Collector<Vertex<K, VV>> out) throws Exception {
+
+ this.applyFunction.setOutput(currentValue, out);
+ this.applyFunction.apply(newValue.f1, currentValue.getValue());
}
@Override
[3/9] flink git commit: [FLINK-1514] [gelly] Add a Gather-Sum-Apply
iteration method
Posted by va...@apache.org.
[FLINK-1514] [gelly] Add a Gather-Sum-Apply iteration method
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/722719f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/722719f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/722719f2
Branch: refs/heads/master
Commit: 722719f2a2a357962b13beffc8f52ca3031e9926
Parents: b2aafe5
Author: Dániel Bali <ba...@gmail.com>
Authored: Mon Feb 16 15:06:05 2015 +0100
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 19:52:23 2015 +0200
----------------------------------------------------------------------
flink-staging/flink-gelly/pom.xml | 31 ++
.../main/java/org/apache/flink/graph/Graph.java | 25 +-
.../example/GSAGreedyGraphColoringExample.java | 224 ++++++++++++
.../GSASingleSourceShortestPathsExample.java | 215 ++++++++++++
.../apache/flink/graph/gsa/ApplyFunction.java | 69 ++++
.../apache/flink/graph/gsa/GatherFunction.java | 52 +++
.../graph/gsa/GatherSumApplyIteration.java | 340 +++++++++++++++++++
.../org/apache/flink/graph/gsa/RichEdge.java | 46 +++
.../org/apache/flink/graph/gsa/SumFunction.java | 52 +++
.../flink/graph/test/GatherSumApplyITCase.java | 118 +++++++
10 files changed, 1165 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/pom.xml b/flink-staging/flink-gelly/pom.xml
index a36ab4b..5245667 100644
--- a/flink-staging/flink-gelly/pom.xml
+++ b/flink-staging/flink-gelly/pom.xml
@@ -57,4 +57,35 @@ under the License.
<version>${guava.version}</version>
</dependency>
</dependencies>
+
+ <!-- See main pom.xml for explanation of profiles -->
+ <profiles>
+ <profile>
+ <id>hadoop-1</id>
+ <activation>
+ <property>
+ <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
+ <!--hadoop1--><name>hadoop.profile</name><value>1</value>
+ </property>
+ </activation>
+ <dependencies>
+ <!-- Add this here, for hadoop-2 we don't need it since we get guava transitively -->
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>${guava.version}</version>
+ <scope>provided</scope>
+ </dependency>
+ </dependencies>
+ </profile>
+ <profile>
+ <id>hadoop-2</id>
+ <activation>
+ <property>
+ <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
+ <!--hadoop2--><name>!hadoop.profile</name>
+ </property>
+ </activation>
+ </profile>
+ </profiles>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
old mode 100644
new mode 100755
index 62173e3..2a66aca
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -43,15 +43,18 @@ import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.graph.spargel.IterationConfiguration;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.GatherSumApplyIteration;
+import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.graph.spargel.MessagingFunction;
import org.apache.flink.graph.spargel.VertexCentricIteration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
import org.apache.flink.graph.utils.EdgeToTuple3Map;
+import org.apache.flink.graph.utils.GraphUtils;
import org.apache.flink.graph.utils.Tuple2ToVertexMap;
import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
import org.apache.flink.graph.utils.VertexToTuple2Map;
@@ -79,7 +82,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
private final DataSet<Edge<K, EV>> edges;
/**
- * Creates a graph from two DataSets: vertices and edges
+ * Creates a graph from two DataSets: vertices and edges and allow setting
+ * the undirected property
*
* @param vertices a DataSet of vertices.
* @param edges a DataSet of edges.
@@ -347,7 +351,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
@Override
public void join(Tuple4<K, K, VV, EV> tripletWithSrcValSet,
- Vertex<K, VV> vertex, Collector<Triplet<K, VV, EV>> collector) throws Exception {
+ Vertex<K, VV> vertex, Collector<Triplet<K, VV, EV>> collector) throws Exception {
collector.collect(new Triplet<K, VV, EV>(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1,
tripletWithSrcValSet.f2, vertex.getValue(), tripletWithSrcValSet.f3));
@@ -914,7 +918,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
}
/**
- * @return a long integer representing the number of edges
+ * @return Singleton DataSet containing the edge count
*/
public long numberOfEdges() throws Exception {
return edges.count();
@@ -1011,6 +1015,13 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
}
}
+ private static final class CheckIfOneComponentMapper implements MapFunction<Integer, Boolean> {
+ @Override
+ public Boolean map(Integer n) {
+ return (n == 1);
+ }
+ }
+
/**
* Adds the input vertex and edges to the graph. If the vertex already
* exists in the graph, it will not be added again, but the given edges
@@ -1165,7 +1176,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
int maximumNumberOfIterations) {
return this.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
- maximumNumberOfIterations, null);
+ maximumNumberOfIterations, null);
}
/**
@@ -1397,4 +1408,4 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class, function.getClass(), 3, null, null);
}
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java
new file mode 100755
index 0000000..4acd86c
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.GatherSumApplyIteration;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.RichEdge;
+import org.apache.flink.util.Collector;
+
+import java.util.HashSet;
+
+/**
+ * This is an implementation of the Greedy Graph Coloring algorithm, using a gather-sum-apply iteration
+ */
+public class GSAGreedyGraphColoringExample implements ProgramDescription {
+
+ // --------------------------------------------------------------------------------------------
+ // Program
+ // --------------------------------------------------------------------------------------------
+
+ public static void main(String[] args) throws Exception {
+
+ if (!parseParameters(args)) {
+ return;
+ }
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
+ DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
+
+ Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
+
+ // Gather the target vertices into a one-element set
+ GatherFunction<Double, Double, HashSet<Double>> gather = new GreedyGraphColoringGather();
+
+ // Merge the sets between neighbors
+ SumFunction<Double, Double, HashSet<Double>> sum = new GreedyGraphColoringSum();
+
+ // Find the minimum vertex id in the set which will be propagated
+ ApplyFunction<Double, Double, HashSet<Double>> apply = new GreedyGraphColoringApply();
+
+ // Execute the GSA iteration
+ GatherSumApplyIteration<Long, Double, Double, HashSet<Double>> iteration =
+ graph.createGatherSumApplyIteration(gather, sum, apply, maxIterations);
+ Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration(iteration);
+
+ // Extract the vertices as the result
+ DataSet<Vertex<Long, Double>> greedyGraphColoring = result.getVertices();
+
+ // emit result
+ if (fileOutput) {
+ greedyGraphColoring.writeAsCsv(outputPath, "\n", " ");
+ } else {
+ greedyGraphColoring.print();
+ }
+
+ env.execute("GSA Greedy Graph Coloring");
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Greedy Graph Coloring UDFs
+ // --------------------------------------------------------------------------------------------
+
+ private static final class GreedyGraphColoringGather
+ extends GatherFunction<Double, Double, HashSet<Double>> {
+ @Override
+ public HashSet<Double> gather(RichEdge<Double, Double> richEdge) {
+
+ HashSet<Double> result = new HashSet<Double>();
+ result.add(richEdge.getSrcVertexValue());
+
+ return result;
+ }
+ };
+
+ private static final class GreedyGraphColoringSum
+ extends SumFunction<Double, Double, HashSet<Double>> {
+ @Override
+ public HashSet<Double> sum(HashSet<Double> newValue, HashSet<Double> currentValue) {
+
+ HashSet<Double> result = new HashSet<Double>();
+ result.addAll(newValue);
+ result.addAll(currentValue);
+
+ return result;
+ }
+ };
+
+ private static final class GreedyGraphColoringApply
+ extends ApplyFunction<Double, Double, HashSet<Double>> {
+ @Override
+ public void apply(HashSet<Double> set, Double src) {
+ double minValue = src;
+ for (Double d : set) {
+ if (d < minValue) {
+ minValue = d;
+ }
+ }
+
+ // This is the condition that enables the termination of the iteration
+ if (minValue < src) {
+ setResult(minValue);
+ }
+ }
+ };
+
+ // --------------------------------------------------------------------------------------------
+ // Util methods
+ // --------------------------------------------------------------------------------------------
+
+ private static boolean fileOutput = false;
+ private static String vertexInputPath = null;
+ private static String edgeInputPath = null;
+ private static String outputPath = null;
+
+ private static int maxIterations = 16;
+
+ private static boolean parseParameters(String[] args) {
+
+ if(args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+
+ if(args.length != 4) {
+ System.err.println("Usage: GSAGreedyGraphColoringExample <vertex path> <edge path> " +
+ "<result path> <max iterations>");
+ return false;
+ }
+
+ vertexInputPath = args[0];
+ edgeInputPath = args[1];
+ outputPath = args[2];
+ maxIterations = Integer.parseInt(args[3]);
+ } else {
+ System.out.println("Executing GSA Greedy Graph Coloring example with built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: GSAGreedyGraphColoringExample <vertex path> <edge path> "
+ + "<result path> <max iterations>");
+ }
+ return true;
+ }
+
+ private static DataSet<Vertex<Long, Double>> getVertexDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ return env
+ .readCsvFile(vertexInputPath)
+ .fieldDelimiter(" ")
+ .lineDelimiter("\n")
+ .types(Long.class, Double.class)
+ .map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() {
+ @Override
+ public Vertex<Long, Double> map(Tuple2<Long, Double> value) throws Exception {
+ return new Vertex<Long, Double>(value.f0, value.f1);
+ }
+ });
+ }
+
+ return env.generateSequence(0, 5).map(new MapFunction<Long, Vertex<Long, Double>>() {
+ @Override
+ public Vertex<Long, Double> map(Long value) throws Exception {
+ return new Vertex<Long, Double>(value, (double) value);
+ }
+ });
+ }
+
+ private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ return env.readCsvFile(edgeInputPath)
+ .fieldDelimiter(" ")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class, Double.class)
+ .map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
+ @Override
+ public Edge<Long, Double> map(Tuple3<Long, Long, Double> value) throws Exception {
+ return new Edge<Long, Double>(value.f0, value.f1, value.f2);
+ }
+ });
+ }
+
+ return env.generateSequence(0, 5).flatMap(new FlatMapFunction<Long, Edge<Long, Double>>() {
+ @Override
+ public void flatMap(Long value, Collector<Edge<Long, Double>> out) throws Exception {
+ out.collect(new Edge<Long, Double>(value, (value + 1) % 6, 0.0));
+ out.collect(new Edge<Long, Double>(value, (value + 2) % 6, 0.0));
+ }
+ });
+ }
+
+ @Override
+ public String getDescription() {
+ return "GSA Greedy Graph Coloring";
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
new file mode 100755
index 0000000..9c8328b
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.GatherSumApplyIteration;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.RichEdge;
+
+import java.io.Serializable;
+
+/**
+ * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
+ */
+public class GSASingleSourceShortestPathsExample implements ProgramDescription {
+
+ // --------------------------------------------------------------------------------------------
+ // Program
+ // --------------------------------------------------------------------------------------------
+
+ public static void main(String[] args) throws Exception {
+
+ if(!parseParameters(args)) {
+ return;
+ }
+
+ ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+ DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
+ DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
+
+ Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
+
+ // The path from src to trg through edge e costs src + e
+ GatherFunction<Double, Double, Double> gather = new SingleSourceShortestPathGather();
+
+ // Return the smaller path length to minimize distance
+ SumFunction<Double, Double, Double> sum = new SingleSourceShortestPathSum();
+
+ // Iterate as long as the distance is updated
+ ApplyFunction<Double, Double, Double> apply = new SingleSourceShortestPathApply();
+
+ // Execute the GSA iteration
+ GatherSumApplyIteration<Long, Double, Double, Double> iteration = graph.createGatherSumApplyIteration(
+ gather, sum, apply, maxIterations);
+ Graph<Long, Double, Double> result = graph.mapVertices(new InitVerticesMapper<Long>(srcVertexId))
+ .runGatherSumApplyIteration(iteration);
+
+ // Extract the vertices as the result
+ DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
+
+ // emit result
+ if(fileOutput) {
+ singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
+ } else {
+ singleSourceShortestPaths.print();
+ }
+
+ env.execute("GSA Single Source Shortest Paths Example");
+ }
+
+ public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
+ implements MapFunction<Vertex<K, Double>, Double> {
+
+ private K srcVertexId;
+
+ public InitVerticesMapper(K srcId) {
+ this.srcVertexId = srcId;
+ }
+
+ public Double map(Vertex<K, Double> value) {
+ if (value.f0.equals(srcVertexId)) {
+ return 0.0;
+ } else {
+ return Double.POSITIVE_INFINITY;
+ }
+ }
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Single Source Shortest Path UDFs
+ // --------------------------------------------------------------------------------------------
+
+ private static final class SingleSourceShortestPathGather
+ extends GatherFunction<Double, Double, Double> {
+ @Override
+ public Double gather(RichEdge<Double, Double> richEdge) {
+ return richEdge.getSrcVertexValue() + richEdge.getEdgeValue();
+ }
+ };
+
+ private static final class SingleSourceShortestPathSum
+ extends SumFunction<Double, Double, Double> {
+ @Override
+ public Double sum(Double newValue, Double currentValue) {
+ return Math.min(newValue, currentValue);
+ }
+ };
+
+ private static final class SingleSourceShortestPathApply
+ extends ApplyFunction<Double, Double, Double> {
+ @Override
+ public void apply(Double summed, Double target) {
+ if (summed < target) {
+ setResult(summed);
+ }
+ }
+ };
+
+ // --------------------------------------------------------------------------------------------
+ // Util methods
+ // --------------------------------------------------------------------------------------------
+
+ private static boolean fileOutput = false;
+ private static String vertexInputPath = null;
+ private static String edgeInputPath = null;
+ private static String outputPath = null;
+
+ private static int maxIterations = 2;
+ private static long srcVertexId = 1;
+
+ private static boolean parseParameters(String[] args) {
+
+ if(args.length > 0) {
+ // parse input arguments
+ fileOutput = true;
+
+ if(args.length != 5) {
+ System.err.println("Usage: GSASingleSourceShortestPathsExample <vertex path> <edge path> " +
+ "<result path> <src vertex> <max iterations>");
+ return false;
+ }
+
+ vertexInputPath = args[0];
+ edgeInputPath = args[1];
+ outputPath = args[2];
+ srcVertexId = Long.parseLong(args[3]);
+ maxIterations = Integer.parseInt(args[4]);
+ } else {
+ System.out.println("Executing GSA Single Source Shortest Paths example with built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println(" Usage: GSASingleSourceShortestPathsExample <vertex path> <edge path> "
+ + "<result path> <src vertex> <max iterations>");
+ }
+ return true;
+ }
+
+ private static DataSet<Vertex<Long, Double>> getVertexDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ return env
+ .readCsvFile(vertexInputPath)
+ .fieldDelimiter(" ")
+ .lineDelimiter("\n")
+ .types(Long.class, Double.class)
+ .map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() {
+ @Override
+ public Vertex<Long, Double> map(Tuple2<Long, Double> value) throws Exception {
+ return new Vertex<Long, Double>(value.f0, value.f1);
+ }
+ });
+ } else {
+ return SingleSourceShortestPathsData.getDefaultVertexDataSet(env);
+ }
+ }
+
+ private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
+ if(fileOutput) {
+ return env.readCsvFile(edgeInputPath)
+ .fieldDelimiter(" ")
+ .lineDelimiter("\n")
+ .types(Long.class, Long.class, Double.class)
+ .map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
+ @Override
+ public Edge<Long, Double> map(Tuple3<Long, Long, Double> value) throws Exception {
+ return new Edge<Long, Double>(value.f0, value.f1, value.f2);
+ }
+ });
+ } else {
+ return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
+ }
+ }
+
+ @Override
+ public String getDescription() {
+ return "GSA Single Source Shortest Paths";
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
new file mode 100755
index 0000000..d863da1
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+public abstract class ApplyFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
+
+ public abstract void apply(M message, VV vertexValue);
+
+ /**
+ * Sets the result for the apply function
+ *
+ * @param result the result of the apply phase
+ */
+ public void setResult(VV result) {
+ out.collect(result);
+ }
+
+ /**
+ * This method is executed once per superstep before the vertex update function is invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+ */
+ public void preSuperstep() {};
+
+ /**
+ * This method is executed once per superstep after the vertex update function has been invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+ */
+ public void postSuperstep() {};
+
+ // --------------------------------------------------------------------------------------------
+ // Internal methods
+ // --------------------------------------------------------------------------------------------
+
+ private IterationRuntimeContext runtimeContext;
+
+ private Collector<VV> out;
+
+ public void init(IterationRuntimeContext iterationRuntimeContext) {
+ this.runtimeContext = iterationRuntimeContext;
+ };
+
+ public void setOutput(Collector<VV> out) {
+ this.out = out;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
new file mode 100755
index 0000000..91a468d
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+
+import java.io.Serializable;
+
+public abstract class GatherFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
+
+ public abstract M gather(RichEdge<VV, EV> richEdge);
+
+ /**
+ * This method is executed once per superstep before the vertex update function is invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+ */
+ public void preSuperstep() {};
+
+ /**
+ * This method is executed once per superstep after the vertex update function has been invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+ */
+ public void postSuperstep() {};
+
+ // --------------------------------------------------------------------------------------------
+ // Internal methods
+ // --------------------------------------------------------------------------------------------
+
+ private IterationRuntimeContext runtimeContext;
+
+ public void init(IterationRuntimeContext iterationRuntimeContext) {
+ this.runtimeContext = iterationRuntimeContext;
+ };
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
new file mode 100755
index 0000000..426efbb
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.functions.RichFlatJoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.CustomUnaryOperation;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * This class represents iterative graph computations, programmed in a gather-sum-apply perspective.
+ *
+ * @param <K> The type of the vertex key in the graph
+ * @param <VV> The type of the vertex value in the graph
+ * @param <EV> The type of the edge value in the graph
+ * @param <M> The intermediate type used by the gather, sum and apply functions
+ */
+public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
+ VV extends Serializable, EV extends Serializable, M> implements CustomUnaryOperation<Vertex<K, VV>,
+ Vertex<K, VV>> {
+
+ private DataSet<Vertex<K, VV>> vertexDataSet;
+ private DataSet<Edge<K, EV>> edgeDataSet;
+
+ private final GatherFunction<VV, EV, M> gather;
+ private final SumFunction<VV, EV, M> sum;
+ private final ApplyFunction<VV, EV, M> apply;
+ private final int maximumNumberOfIterations;
+
+ private String name;
+ private int parallelism = -1;
+
+ // ----------------------------------------------------------------------------------
+
+ private GatherSumApplyIteration(GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum,
+ ApplyFunction<VV, EV, M> apply, DataSet<Edge<K, EV>> edges, int maximumNumberOfIterations) {
+
+ Validate.notNull(gather);
+ Validate.notNull(sum);
+ Validate.notNull(apply);
+ Validate.notNull(edges);
+ Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
+
+ this.gather = gather;
+ this.sum = sum;
+ this.apply = apply;
+ this.edgeDataSet = edges;
+ this.maximumNumberOfIterations = maximumNumberOfIterations;
+ }
+
+
+ /**
+ * Sets the name for the gather-sum-apply iteration. The name is displayed in logs and messages.
+ *
+ * @param name The name for the iteration.
+ */
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * Gets the name from this gather-sum-apply iteration.
+ *
+ * @return The name of the iteration.
+ */
+ public String getName() {
+ return name;
+ }
+
+ /**
+ * Sets the degree of parallelism for the iteration.
+ *
+ * @param parallelism The degree of parallelism.
+ */
+ public void setParallelism(int parallelism) {
+ Validate.isTrue(parallelism > 0 || parallelism == -1,
+ "The degree of parallelism must be positive, or -1 (use default).");
+ this.parallelism = parallelism;
+ }
+
+ /**
+ * Gets the iteration's degree of parallelism.
+ *
+ * @return The iterations parallelism, or -1, if not set.
+ */
+ public int getParallelism() {
+ return parallelism;
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Custom Operator behavior
+ // --------------------------------------------------------------------------------------------
+
+ /**
+ * Sets the input data set for this operator. In the case of this operator this input data set represents
+ * the set of vertices with their initial state.
+ *
+ * @param dataSet The input data set, which in the case of this operator represents the set of
+ * vertices with their initial state.
+ */
+ @Override
+ public void setInput(DataSet<Vertex<K, VV>> dataSet) {
+ this.vertexDataSet = dataSet;
+ }
+
+ /**
+ * Computes the results of the gather-sum-apply iteration
+ *
+ * @return The resulting DataSet
+ */
+ @Override
+ public DataSet<Vertex<K, VV>> createResult() {
+ if (vertexDataSet == null) {
+ throw new IllegalStateException("The input data set has not been set.");
+ }
+
+ // Prepare type information
+ TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertexDataSet.getType()).getTypeAt(0);
+ TypeInformation<M> messageType = TypeExtractor.createTypeInfo(GatherFunction.class, gather.getClass(), 2, null, null);
+ TypeInformation<Tuple2<K, M>> innerType = new TupleTypeInfo<Tuple2<K, M>>(keyType, messageType);
+ TypeInformation<Vertex<K, VV>> outputType = vertexDataSet.getType();
+
+ // Prepare UDFs
+ GatherUdf<K, VV, EV, M> gatherUdf = new GatherUdf<K, VV, EV, M>(gather, innerType);
+ SumUdf<K, VV, EV, M> sumUdf = new SumUdf<K, VV, EV, M>(sum, innerType);
+ ApplyUdf<K, VV, EV, M> applyUdf = new ApplyUdf<K, VV, EV, M>(apply, outputType);
+
+ final int[] zeroKeyPos = new int[] {0};
+ final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
+ vertexDataSet.iterateDelta(vertexDataSet, maximumNumberOfIterations, zeroKeyPos);
+
+ // Prepare the rich edges
+ DataSet<Tuple2<Vertex<K, VV>, Edge<K, EV>>> richEdges = iteration
+ .getWorkset()
+ .join(edgeDataSet)
+ .where(0)
+ .equalTo(0);
+
+ // Gather, sum and apply
+ DataSet<Tuple2<K, M>> gatheredSet = richEdges.map(gatherUdf);
+ DataSet<Tuple2<K, M>> summedSet = gatheredSet.groupBy(0).reduce(sumUdf);
+ DataSet<Vertex<K, VV>> appliedSet = summedSet
+ .join(iteration.getSolutionSet())
+ .where(0)
+ .equalTo(0)
+ .with(applyUdf);
+
+ return iteration.closeWith(appliedSet, appliedSet);
+ }
+
+ /**
+ * Creates a new gather-sum-apply iteration operator for graphs
+ *
+ * @param edges The edge DataSet
+ *
+ * @param gather The gather function of the GSA iteration
+ * @param sum The sum function of the GSA iteration
+ * @param apply The apply function of the GSA iteration
+ *
+ * @param maximumNumberOfIterations The maximum number of iterations executed
+ *
+ * @param <K> The type of the vertex key in the graph
+ * @param <VV> The type of the vertex value in the graph
+ * @param <EV> The type of the edge value in the graph
+ * @param <M> The intermediate type used by the gather, sum and apply functions
+ *
+ * @return An in stance of the gather-sum-apply graph computation operator.
+ */
+ public static final <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, M>
+ GatherSumApplyIteration<K, VV, EV, M> withEdges(DataSet<Edge<K, EV>> edges,
+ GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum, ApplyFunction<VV, EV, M> apply,
+ int maximumNumberOfIterations) {
+ return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Wrapping UDFs
+ // --------------------------------------------------------------------------------------------
+
+ private static final class GatherUdf<K extends Comparable<K> & Serializable, VV extends Serializable,
+ EV extends Serializable, M> extends RichMapFunction<Tuple2<Vertex<K, VV>, Edge<K, EV>>,
+ Tuple2<K, M>> implements ResultTypeQueryable<Tuple2<K, M>> {
+
+ private final GatherFunction<VV, EV, M> gatherFunction;
+ private transient TypeInformation<Tuple2<K, M>> resultType;
+
+ private GatherUdf(GatherFunction<VV, EV, M> gatherFunction, TypeInformation<Tuple2<K, M>> resultType) {
+ this.gatherFunction = gatherFunction;
+ this.resultType = resultType;
+ }
+
+ @Override
+ public Tuple2<K, M> map(Tuple2<Vertex<K, VV>, Edge<K, EV>> richEdge) throws Exception {
+ RichEdge<VV, EV> userRichEdge = new RichEdge<VV, EV>(richEdge.f0.getValue(),
+ richEdge.f1.getValue());
+
+ K key = richEdge.f1.getTarget();
+ M result = this.gatherFunction.gather(userRichEdge);
+ return new Tuple2<K, M>(key, result);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+ this.gatherFunction.init(getIterationRuntimeContext());
+ }
+ this.gatherFunction.preSuperstep();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.gatherFunction.postSuperstep();
+ }
+
+ @Override
+ public TypeInformation<Tuple2<K, M>> getProducedType() {
+ return this.resultType;
+ }
+ }
+
+ private static final class SumUdf<K extends Comparable<K> & Serializable, VV extends Serializable,
+ EV extends Serializable, M> extends RichReduceFunction<Tuple2<K, M>>
+ implements ResultTypeQueryable<Tuple2<K, M>>{
+
+ private final SumFunction<VV, EV, M> sumFunction;
+ private transient TypeInformation<Tuple2<K, M>> resultType;
+
+ private SumUdf(SumFunction<VV, EV, M> sumFunction, TypeInformation<Tuple2<K, M>> resultType) {
+ this.sumFunction = sumFunction;
+ this.resultType = resultType;
+ }
+
+ @Override
+ public Tuple2<K, M> reduce(Tuple2<K, M> arg0, Tuple2<K, M> arg1) throws Exception {
+ K key = arg0.f0;
+ M result = this.sumFunction.sum(arg0.f1, arg1.f1);
+ return new Tuple2<K, M>(key, result);
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+ this.sumFunction.init(getIterationRuntimeContext());
+ }
+ this.sumFunction.preSuperstep();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.sumFunction.postSuperstep();
+ }
+
+ @Override
+ public TypeInformation<Tuple2<K, M>> getProducedType() {
+ return this.resultType;
+ }
+ }
+
+ private static final class ApplyUdf<K extends Comparable<K> & Serializable,
+ VV extends Serializable, EV extends Serializable, M> extends RichFlatJoinFunction<Tuple2<K, M>,
+ Vertex<K, VV>, Vertex<K, VV>> implements ResultTypeQueryable<Vertex<K, VV>> {
+
+ private final ApplyFunction<VV, EV, M> applyFunction;
+ private transient TypeInformation<Vertex<K, VV>> resultType;
+
+ private ApplyUdf(ApplyFunction<VV, EV, M> applyFunction, TypeInformation<Vertex<K, VV>> resultType) {
+ this.applyFunction = applyFunction;
+ this.resultType = resultType;
+ }
+
+ @Override
+ public void join(Tuple2<K, M> arg0, Vertex<K, VV> arg1, final Collector<Vertex<K, VV>> out) throws Exception {
+
+ final K key = arg1.getId();
+ Collector<VV> userOut = new Collector<VV>() {
+ @Override
+ public void collect(VV record) {
+ out.collect(new Vertex<K, VV>(key, record));
+ }
+
+ @Override
+ public void close() {
+ out.close();
+ }
+ };
+
+ this.applyFunction.setOutput(userOut);
+ this.applyFunction.apply(arg0.f1, arg1.getValue());
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+ this.applyFunction.init(getIterationRuntimeContext());
+ }
+ this.applyFunction.preSuperstep();
+ }
+
+ @Override
+ public void close() throws Exception {
+ this.applyFunction.postSuperstep();
+ }
+
+ @Override
+ public TypeInformation<Vertex<K, VV>> getProducedType() {
+ return this.resultType;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
new file mode 100755
index 0000000..9befccb
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+
+/**
+ * A wrapper around Tuple3<VV, EV, VV> for convenience in the GatherFunction
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ */
+public class RichEdge<VV extends Serializable, EV extends Serializable>
+ extends Tuple2<VV, EV> {
+
+ public RichEdge() {}
+
+ public RichEdge(VV src, EV edge) {
+ super(src, edge);
+ }
+
+ public VV getSrcVertexValue() {
+ return this.f0;
+ }
+
+ public EV getEdgeValue() {
+ return this.f1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
new file mode 100755
index 0000000..b616194
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+
+import java.io.Serializable;
+
+public abstract class SumFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
+
+ public abstract M sum(M arg0, M arg1);
+
+ /**
+ * This method is executed once per superstep before the vertex update function is invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+ */
+ public void preSuperstep() {};
+
+ /**
+ * This method is executed once per superstep after the vertex update function has been invoked for each vertex.
+ *
+ * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+ */
+ public void postSuperstep() {};
+
+ // --------------------------------------------------------------------------------------------
+ // Internal methods
+ // --------------------------------------------------------------------------------------------
+
+ private IterationRuntimeContext runtimeContext;
+
+ public void init(IterationRuntimeContext iterationRuntimeContext) {
+ this.runtimeContext = iterationRuntimeContext;
+ };
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
new file mode 100755
index 0000000..321d530
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.GSAGreedyGraphColoringExample;
+import org.apache.flink.graph.example.GSASingleSourceShortestPathsExample;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class GatherSumApplyITCase extends MultipleProgramsTestBase {
+
+ public GatherSumApplyITCase(TestExecutionMode mode){
+ super(mode);
+ }
+
+ private String verticesPath;
+ private String edgesPath;
+ private String resultPath;
+ private String expectedResult;
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Before
+ public void before() throws Exception{
+ resultPath = tempFolder.newFile().toURI().toString();
+ File verticesFile = tempFolder.newFile();
+ Files.write(GatherSumApplyITCase.VERTICES, verticesFile, Charsets.UTF_8);
+
+ File edgesFile = tempFolder.newFile();
+ Files.write(GatherSumApplyITCase.EDGES, edgesFile, Charsets.UTF_8);
+
+ verticesPath = verticesFile.toURI().toString();
+ edgesPath = edgesFile.toURI().toString();
+
+ }
+
+ @After
+ public void after() throws Exception{
+ compareResultsByLinesInMemory(expectedResult, resultPath);
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Greedy Graph Coloring Test
+ // --------------------------------------------------------------------------------------------
+
+ @Test
+ public void testGreedyGraphColoring() throws Exception {
+ GSAGreedyGraphColoringExample.main(new String[] {verticesPath, edgesPath, resultPath, "16"});
+ expectedResult = "1 1.0\n" +
+ "2 1.0\n" +
+ "3 1.0\n" +
+ "4 1.0\n" +
+ "5 1.0\n";
+
+ }
+
+ // --------------------------------------------------------------------------------------------
+ // Single Source Shortest Path Test
+ // --------------------------------------------------------------------------------------------
+
+ @Test
+ public void testSingleSourceShortestPath() throws Exception {
+ GSASingleSourceShortestPathsExample.main(new String[]{verticesPath, edgesPath, resultPath, "1", "16"});
+ expectedResult = "1 0.0\n" +
+ "2 12.0\n" +
+ "3 13.0\n" +
+ "4 47.0\n" +
+ "5 48.0\n";
+ }
+
+
+ // --------------------------------------------------------------------------------------------
+ // Sample data
+ // --------------------------------------------------------------------------------------------
+
+ private static final String VERTICES = "1 1.0\n" +
+ "2 2.0\n" +
+ "3 3.0\n" +
+ "4 4.0\n" +
+ "5 5.0\n";
+
+ private static final String EDGES = "1 2 12.0\n" +
+ "1 3 13.0\n" +
+ "2 3 23.0\n" +
+ "3 4 34.0\n" +
+ "3 5 35.0\n" +
+ "4 5 45.0\n" +
+ "5 1 51.0\n";
+
+}
[4/9] flink git commit: [FLINK-1514] [gelly] Fixed inconsistencies
after merge
Posted by va...@apache.org.
[FLINK-1514] [gelly] Fixed inconsistencies after merge
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/837508df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/837508df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/837508df
Branch: refs/heads/master
Commit: 837508df876c77cf22d7b289e76547df6a485bf0
Parents: 740d437
Author: Dániel Bali <ba...@gmail.com>
Authored: Sun Apr 19 21:17:03 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 19:52:24 2015 +0200
----------------------------------------------------------------------
flink-staging/flink-gelly/pom.xml | 31 -------------
.../main/java/org/apache/flink/graph/Graph.java | 12 +----
.../example/GSAConnectedComponentsExample.java | 10 ++---
.../GSASingleSourceShortestPathsExample.java | 4 +-
.../apache/flink/graph/gsa/GatherFunction.java | 2 +-
.../graph/gsa/GatherSumApplyIteration.java | 16 +++----
.../org/apache/flink/graph/gsa/Neighbor.java | 47 ++++++++++++++++++++
.../org/apache/flink/graph/gsa/RichEdge.java | 47 --------------------
8 files changed, 65 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/pom.xml b/flink-staging/flink-gelly/pom.xml
old mode 100644
new mode 100755
index 5245667..a36ab4b
--- a/flink-staging/flink-gelly/pom.xml
+++ b/flink-staging/flink-gelly/pom.xml
@@ -57,35 +57,4 @@ under the License.
<version>${guava.version}</version>
</dependency>
</dependencies>
-
- <!-- See main pom.xml for explanation of profiles -->
- <profiles>
- <profile>
- <id>hadoop-1</id>
- <activation>
- <property>
- <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
- <!--hadoop1--><name>hadoop.profile</name><value>1</value>
- </property>
- </activation>
- <dependencies>
- <!-- Add this here, for hadoop-2 we don't need it since we get guava transitively -->
- <dependency>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- <version>${guava.version}</version>
- <scope>provided</scope>
- </dependency>
- </dependencies>
- </profile>
- <profile>
- <id>hadoop-2</id>
- <activation>
- <property>
- <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
- <!--hadoop2--><name>!hadoop.profile</name>
- </property>
- </activation>
- </profile>
- </profiles>
</project>
http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index d564f24..330c951 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -83,8 +83,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
private final DataSet<Edge<K, EV>> edges;
/**
- * Creates a graph from two DataSets: vertices and edges and allow setting
- * the undirected property
+ * Creates a graph from two DataSets: vertices and edges
*
* @param vertices a DataSet of vertices.
* @param edges a DataSet of edges.
@@ -919,7 +918,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
}
/**
- * @return Singleton DataSet containing the edge count
+ * @return a long integer representing the number of edges
*/
public long numberOfEdges() throws Exception {
return edges.count();
@@ -1016,13 +1015,6 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
}
}
- private static final class CheckIfOneComponentMapper implements MapFunction<Integer, Boolean> {
- @Override
- public Boolean map(Integer n) {
- return (n == 1);
- }
- }
-
/**
* Adds the input vertex and edges to the graph. If the vertex already
* exists in the graph, it will not be added again, but the given edges
http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
index d338b03..6a2c250 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -30,7 +30,7 @@ import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.RichEdge;
+import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector;
@@ -70,13 +70,13 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
graph.runGatherSumApplyIteration(gather, sum, apply, maxIterations);
// Extract the vertices as the result
- DataSet<Vertex<Long, Long>> greedyGraphColoring = result.getVertices();
+ DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices();
// emit result
if (fileOutput) {
- greedyGraphColoring.writeAsCsv(outputPath, "\n", " ");
+ connectedComponents.writeAsCsv(outputPath, "\n", " ");
} else {
- greedyGraphColoring.print();
+ connectedComponents.print();
}
env.execute("GSA Connected Components");
@@ -99,7 +99,7 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
private static final class ConnectedComponentsGather
extends GatherFunction<Long, NullValue, Long> {
@Override
- public Long gather(RichEdge<Long, NullValue> richEdge) {
+ public Long gather(Neighbor<Long, NullValue> richEdge) {
return richEdge.getSrcVertexValue();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
index 8967a90..cc3b054 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -31,7 +31,7 @@ import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.RichEdge;
+import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.util.Collector;
/**
@@ -116,7 +116,7 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
private static final class SingleSourceShortestPathGather
extends GatherFunction<Double, Double, Double> {
@Override
- public Double gather(RichEdge<Double, Double> richEdge) {
+ public Double gather(Neighbor<Double, Double> richEdge) {
return richEdge.getSrcVertexValue() + richEdge.getEdgeValue();
}
};
http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
index 91a468d..0d110d5 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
@@ -24,7 +24,7 @@ import java.io.Serializable;
public abstract class GatherFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
- public abstract M gather(RichEdge<VV, EV> richEdge);
+ public abstract M gather(Neighbor<VV, EV> neighbor);
/**
* This method is executed once per superstep before the vertex update function is invoked for each vertex.
http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 1adab29..67ae094 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -117,15 +117,15 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
vertexDataSet.iterateDelta(vertexDataSet, maximumNumberOfIterations, zeroKeyPos);
- // Prepare the rich edges
- DataSet<Tuple2<Vertex<K, VV>, Edge<K, EV>>> richEdges = iteration
+ // Prepare the neighbors
+ DataSet<Tuple2<Vertex<K, VV>, Edge<K, EV>>> neighbors = iteration
.getWorkset()
.join(edgeDataSet)
.where(0)
.equalTo(0);
// Gather, sum and apply
- DataSet<Tuple2<K, M>> gatheredSet = richEdges.map(gatherUdf);
+ DataSet<Tuple2<K, M>> gatheredSet = neighbors.map(gatherUdf);
DataSet<Tuple2<K, M>> summedSet = gatheredSet.groupBy(0).reduce(sumUdf);
DataSet<Vertex<K, VV>> appliedSet = summedSet
.join(iteration.getSolutionSet())
@@ -178,12 +178,12 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
}
@Override
- public Tuple2<K, M> map(Tuple2<Vertex<K, VV>, Edge<K, EV>> richEdge) throws Exception {
- RichEdge<VV, EV> userRichEdge = new RichEdge<VV, EV>(richEdge.f0.getValue(),
- richEdge.f1.getValue());
+ public Tuple2<K, M> map(Tuple2<Vertex<K, VV>, Edge<K, EV>> neighborTuple) throws Exception {
+ Neighbor<VV, EV> neighbor = new Neighbor<VV, EV>(neighborTuple.f0.getValue(),
+ neighborTuple.f1.getValue());
- K key = richEdge.f1.getTarget();
- M result = this.gatherFunction.gather(userRichEdge);
+ K key = neighborTuple.f1.getTarget();
+ M result = this.gatherFunction.gather(neighbor);
return new Tuple2<K, M>(key, result);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
new file mode 100755
index 0000000..2260022
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+
+/**
+ * This class represents a <sourceVertex, edge> pair
+ * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ */
+public class Neighbor<VV extends Serializable, EV extends Serializable>
+ extends Tuple2<VV, EV> {
+
+ public Neighbor() {}
+
+ public Neighbor(VV src, EV edge) {
+ super(src, edge);
+ }
+
+ public VV getSrcVertexValue() {
+ return this.f0;
+ }
+
+ public EV getEdgeValue() {
+ return this.f1;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
deleted file mode 100755
index 8d4b4d8..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.gsa;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.io.Serializable;
-
-/**
- * This class represents a <sourceVertex, edge> pair
- * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
- * @param <VV> the vertex value type
- * @param <EV> the edge value type
- */
-public class RichEdge<VV extends Serializable, EV extends Serializable>
- extends Tuple2<VV, EV> {
-
- public RichEdge() {}
-
- public RichEdge(VV src, EV edge) {
- super(src, edge);
- }
-
- public VV getSrcVertexValue() {
- return this.f0;
- }
-
- public EV getEdgeValue() {
- return this.f1;
- }
-}
[6/9] flink git commit: [FLINK-1514] [gelly] renamed fields in
Neighbor class; added forwarded fields hint in GSAIteration
Posted by va...@apache.org.
[FLINK-1514] [gelly] renamed fields in Neighbor class; added forwarded fields hint in GSAIteration
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/66d72acd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/66d72acd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/66d72acd
Branch: refs/heads/master
Commit: 66d72acd23e3006b23348babc256bb6366290c61
Parents: 4e3165e
Author: vasia <va...@apache.org>
Authored: Tue Apr 21 02:51:04 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 20:00:59 2015 +0200
----------------------------------------------------------------------
.../flink/graph/example/GSAConnectedComponentsExample.java | 2 +-
.../graph/example/GSASingleSourceShortestPathsExample.java | 4 ++--
.../org/apache/flink/graph/gsa/GatherSumApplyIteration.java | 6 +++++-
.../src/main/java/org/apache/flink/graph/gsa/Neighbor.java | 6 +++---
4 files changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/66d72acd/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
index fca7b1d..dba28cd 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -89,7 +89,7 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
public Long gather(Neighbor<Long, NullValue> neighbor) {
- return neighbor.getSrcVertexValue();
+ return neighbor.getNeighborValue();
}
};
http://git-wip-us.apache.org/repos/asf/flink/blob/66d72acd/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
index b4f1f34..166a3d3 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -97,8 +97,8 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
@SuppressWarnings("serial")
private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
- public Double gather(Neighbor<Double, Double> richEdge) {
- return richEdge.getSrcVertexValue() + richEdge.getEdgeValue();
+ public Double gather(Neighbor<Double, Double> neighbor) {
+ return neighbor.getNeighborValue() + neighbor.getEdgeValue();
}
};
http://git-wip-us.apache.org/repos/asf/flink/blob/66d72acd/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 53fa7a4..992f840 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.CustomUnaryOperation;
import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.JoinOperator;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -127,12 +128,15 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
// Gather, sum and apply
DataSet<Tuple2<K, M>> gatheredSet = neighbors.map(gatherUdf);
DataSet<Tuple2<K, M>> summedSet = gatheredSet.groupBy(0).reduce(sumUdf);
- DataSet<Vertex<K, VV>> appliedSet = summedSet
+ JoinOperator<?, ?, Vertex<K, VV>> appliedSet = summedSet
.join(iteration.getSolutionSet())
.where(0)
.equalTo(0)
.with(applyUdf);
+ // let the operator know that we preserve the key field
+ appliedSet.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
+
return iteration.closeWith(appliedSet, appliedSet);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/66d72acd/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
index 39f18d5..5a06af9 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
@@ -34,11 +34,11 @@ public class Neighbor<VV extends Serializable, EV extends Serializable>
public Neighbor() {}
- public Neighbor(VV src, EV edge) {
- super(src, edge);
+ public Neighbor(VV neighborValue, EV edgeValue) {
+ super(neighborValue, edgeValue);
}
- public VV getSrcVertexValue() {
+ public VV getNeighborValue() {
return this.f0;
}