You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/04/26 14:01:08 UTC
[5/9] flink git commit: [FLINK-1514] [gelly] improvements to the GSA
SSSP example; improvements to the GSA Connected Components example
[FLINK-1514] [gelly] improvements to the GSA SSSP example;
improvements to the GSA Connected Components example
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/921414d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/921414d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/921414d6
Branch: refs/heads/master
Commit: 921414d6e3582cb42820475ef22444b3eea26c96
Parents: 837508d
Author: vasia <va...@apache.org>
Authored: Tue Apr 21 01:06:13 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 20:00:11 2015 +0200
----------------------------------------------------------------------
.../example/GSAConnectedComponentsExample.java | 54 ++++-----
.../GSASingleSourceShortestPathsExample.java | 118 ++++++++-----------
2 files changed, 68 insertions(+), 104 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/921414d6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
index 6a2c250..fca7b1d 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -52,22 +52,13 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
- DataSet<Vertex<Long, Long>> vertices = edges.flatMap(new InitVerticesMapper()).distinct();
- Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
-
- // Simply return the vertex value of each vertex
- GatherFunction<Long, NullValue, Long> gather = new ConnectedComponentsGather();
-
- // Select the lower value among neighbors
- SumFunction<Long, NullValue, Long> sum = new ConnectedComponentsSum();
-
- // Set the lower value for each vertex
- ApplyFunction<Long, NullValue, Long> apply = new ConnectedComponentsApply();
+ Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
// Execute the GSA iteration
Graph<Long, Long, NullValue> result =
- graph.runGatherSumApplyIteration(gather, sum, apply, maxIterations);
+ graph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(),
+ new UpdateComponentId(), maxIterations);
// Extract the vertices as the result
DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices();
@@ -82,13 +73,11 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
env.execute("GSA Connected Components");
}
- private static final class InitVerticesMapper
- implements FlatMapFunction<Edge<Long, NullValue>, Vertex<Long, Long>>{
+ @SuppressWarnings("serial")
+ private static final class InitVertices implements MapFunction<Long, Long> {
- @Override
- public void flatMap(Edge<Long, NullValue> edge, Collector<Vertex<Long, Long>> out) throws Exception {
- out.collect(new Vertex<Long, Long>(edge.getSource(), edge.getSource()));
- out.collect(new Vertex<Long, Long>(edge.getTarget(), edge.getTarget()));
+ public Long map(Long vertexId) {
+ return vertexId;
}
}
@@ -96,29 +85,26 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
// Connected Components UDFs
// --------------------------------------------------------------------------------------------
- private static final class ConnectedComponentsGather
- extends GatherFunction<Long, NullValue, Long> {
- @Override
- public Long gather(Neighbor<Long, NullValue> richEdge) {
+ @SuppressWarnings("serial")
+ private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
- return richEdge.getSrcVertexValue();
+ public Long gather(Neighbor<Long, NullValue> neighbor) {
+ return neighbor.getSrcVertexValue();
}
};
- private static final class ConnectedComponentsSum
- extends SumFunction<Long, NullValue, Long> {
- @Override
- public Long sum(Long newValue, Long currentValue) {
+ @SuppressWarnings("serial")
+ private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
+ public Long sum(Long newValue, Long currentValue) {
return Math.min(newValue, currentValue);
}
};
- private static final class ConnectedComponentsApply
- extends ApplyFunction<Long, NullValue, Long> {
- @Override
- public void apply(Long summedValue, Long origValue) {
+ @SuppressWarnings("serial")
+ private static final class UpdateComponentId extends ApplyFunction<Long, NullValue, Long> {
+ public void apply(Long summedValue, Long origValue) {
if (summedValue < origValue) {
setResult(summedValue);
}
@@ -159,14 +145,15 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
return true;
}
+ @SuppressWarnings("serial")
private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
if (fileOutput) {
return env.readCsvFile(edgeInputPath)
- .fieldDelimiter(" ")
+ .fieldDelimiter("\t")
.lineDelimiter("\n")
.types(Long.class, Long.class)
.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
- @Override
+
public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
}
@@ -186,5 +173,4 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
public String getDescription() {
return "GSA Connected Components";
}
-
}
http://git-wip-us.apache.org/repos/asf/flink/blob/921414d6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
index cc3b054..488c66c 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -19,11 +19,9 @@
package org.apache.flink.graph.example;
import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
@@ -32,7 +30,7 @@ import org.apache.flink.graph.gsa.ApplyFunction;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.SumFunction;
import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.util.Collector;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
/**
* This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
@@ -52,24 +50,13 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
- DataSet<Vertex<Long, Double>> vertices = edges
- .flatMap(new InitVerticesMapper(srcVertexId))
- .distinct();
- Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
-
- // The path from src to trg through edge e costs src + e
- GatherFunction<Double, Double, Double> gather = new SingleSourceShortestPathGather();
-
- // Return the smaller path length to minimize distance
- SumFunction<Double, Double, Double> sum = new SingleSourceShortestPathSum();
-
- // Iterate as long as the distance is updated
- ApplyFunction<Double, Double, Double> apply = new SingleSourceShortestPathApply();
+ Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
// Execute the GSA iteration
Graph<Long, Double, Double> result = graph
- .runGatherSumApplyIteration(gather, sum, apply, maxIterations);
+ .runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
+ new UpdateDistance(), maxIterations);
// Extract the vertices as the result
DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
@@ -84,27 +71,21 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
env.execute("GSA Single Source Shortest Paths Example");
}
- private static final class InitVerticesMapper
- implements FlatMapFunction<Edge<Long, Double>, Vertex<Long, Double>>{
+ @SuppressWarnings("serial")
+ private static final class InitVertices implements MapFunction<Long, Double>{
- private long srcVertexId;
+ private long srcId;
- public InitVerticesMapper(long srcId) {
- this.srcVertexId = srcId;
+ public InitVertices(long srcId) {
+ this.srcId = srcId;
}
- @Override
- public void flatMap(Edge<Long, Double> edge, Collector<Vertex<Long, Double>> out) throws Exception {
- if (edge.getSource() == srcVertexId) {
- out.collect(new Vertex<Long, Double>(edge.getSource(), 0.0));
- } else {
- out.collect(new Vertex<Long, Double>(edge.getSource(), Double.POSITIVE_INFINITY));
+ public Double map(Long id) {
+ if (id.equals(srcId)) {
+ return 0.0;
}
-
- if (edge.getTarget() == srcVertexId) {
- out.collect(new Vertex<Long, Double>(edge.getTarget(), 0.0));
- } else {
- out.collect(new Vertex<Long, Double>(edge.getTarget(), Double.POSITIVE_INFINITY));
+ else {
+ return Double.MAX_VALUE;
}
}
}
@@ -113,28 +94,28 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
// Single Source Shortest Path UDFs
// --------------------------------------------------------------------------------------------
- private static final class SingleSourceShortestPathGather
- extends GatherFunction<Double, Double, Double> {
- @Override
+ @SuppressWarnings("serial")
+ private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
+
public Double gather(Neighbor<Double, Double> richEdge) {
return richEdge.getSrcVertexValue() + richEdge.getEdgeValue();
}
};
- private static final class SingleSourceShortestPathSum
- extends SumFunction<Double, Double, Double> {
- @Override
+ @SuppressWarnings("serial")
+ private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
+
public Double sum(Double newValue, Double currentValue) {
return Math.min(newValue, currentValue);
}
};
- private static final class SingleSourceShortestPathApply
- extends ApplyFunction<Double, Double, Double> {
- @Override
- public void apply(Double summed, Double target) {
- if (summed < target) {
- setResult(summed);
+ @SuppressWarnings("serial")
+ private static final class UpdateDistance extends ApplyFunction<Double, Double, Double> {
+
+ public void apply(Double newDistance, Double oldDistance) {
+ if (newDistance < oldDistance) {
+ setResult(newDistance);
}
}
};
@@ -144,50 +125,47 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
// --------------------------------------------------------------------------------------------
private static boolean fileOutput = false;
- private static String edgeInputPath = null;
+
+ private static Long srcVertexId = 1l;
+
+ private static String edgesInputPath = null;
+
private static String outputPath = null;
- private static int maxIterations = 2;
- private static long srcVertexId = 1;
+ private static int maxIterations = 5;
private static boolean parseParameters(String[] args) {
if (args.length > 0) {
- // parse input arguments
- fileOutput = true;
-
- if (args.length != 4) {
- System.err.println("Usage: GSASingleSourceShortestPathsExample <edge path> " +
- "<result path> <src vertex> <max iterations>");
+ if(args.length != 4) {
+ System.err.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
+ " <input edges path> <output path> <num iterations>");
return false;
}
- edgeInputPath = args[0];
- outputPath = args[1];
- srcVertexId = Long.parseLong(args[2]);
+ fileOutput = true;
+ srcVertexId = Long.parseLong(args[0]);
+ edgesInputPath = args[1];
+ outputPath = args[2];
maxIterations = Integer.parseInt(args[3]);
} else {
- System.out.println("Executing GSA Single Source Shortest Paths example with built-in default data.");
- System.out.println(" Provide parameters to read input data from files.");
- System.out.println(" See the documentation for the correct format of input files.");
- System.out.println(" Usage: GSASingleSourceShortestPathsExample <edge path> <result path> <src vertex> "
- + "<max iterations>");
+ System.out.println("Executing GSASingle Source Shortest Paths example "
+ + "with default parameters and built-in default data.");
+ System.out.println(" Provide parameters to read input data from files.");
+ System.out.println(" See the documentation for the correct format of input files.");
+ System.out.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
+ " <input edges path> <output path> <num iterations>");
}
return true;
}
private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
if (fileOutput) {
- return env.readCsvFile(edgeInputPath)
- .fieldDelimiter(" ")
+ return env.readCsvFile(edgesInputPath)
+ .fieldDelimiter("\t")
.lineDelimiter("\n")
.types(Long.class, Long.class, Double.class)
- .map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
- @Override
- public Edge<Long, Double> map(Tuple3<Long, Long, Double> value) throws Exception {
- return new Edge<Long, Double>(value.f0, value.f1, value.f2);
- }
- });
+ .map(new Tuple3ToEdgeMap<Long, Double>());
} else {
return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
}