You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/04/26 14:01:08 UTC

[5/9] flink git commit: [FLINK-1514] [gelly] improvements to the GSA SSSP example; improvements to the GSA Connected Components example

[FLINK-1514] [gelly] improvements to the GSA SSSP example;
improvements to the GSA Connected Components example


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/921414d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/921414d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/921414d6

Branch: refs/heads/master
Commit: 921414d6e3582cb42820475ef22444b3eea26c96
Parents: 837508d
Author: vasia <va...@apache.org>
Authored: Tue Apr 21 01:06:13 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 20:00:11 2015 +0200

----------------------------------------------------------------------
 .../example/GSAConnectedComponentsExample.java  |  54 ++++-----
 .../GSASingleSourceShortestPathsExample.java    | 118 ++++++++-----------
 2 files changed, 68 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/921414d6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
index 6a2c250..fca7b1d 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -52,22 +52,13 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
-		DataSet<Vertex<Long, Long>> vertices = edges.flatMap(new InitVerticesMapper()).distinct();
 
-		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
-
-		// Simply return the vertex value of each vertex
-		GatherFunction<Long, NullValue, Long> gather = new ConnectedComponentsGather();
-
-		// Select the lower value among neighbors
-		SumFunction<Long, NullValue, Long> sum = new ConnectedComponentsSum();
-
-		// Set the lower value for each vertex
-		ApplyFunction<Long, NullValue, Long> apply = new ConnectedComponentsApply();
+		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
 
 		// Execute the GSA iteration
 		Graph<Long, Long, NullValue> result =
-				graph.runGatherSumApplyIteration(gather, sum, apply, maxIterations);
+				graph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(),
+						new UpdateComponentId(), maxIterations);
 
 		// Extract the vertices as the result
 		DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices();
@@ -82,13 +73,11 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 		env.execute("GSA Connected Components");
 	}
 
-	private static final class InitVerticesMapper
-			implements FlatMapFunction<Edge<Long, NullValue>, Vertex<Long, Long>>{
+	@SuppressWarnings("serial")
+	private static final class InitVertices	implements MapFunction<Long, Long> {
 
-		@Override
-		public void flatMap(Edge<Long, NullValue> edge, Collector<Vertex<Long, Long>> out) throws Exception {
-			out.collect(new Vertex<Long, Long>(edge.getSource(), edge.getSource()));
-			out.collect(new Vertex<Long, Long>(edge.getTarget(), edge.getTarget()));
+		public Long map(Long vertexId) {
+			return vertexId;
 		}
 	}
 
@@ -96,29 +85,26 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 	//  Connected Components UDFs
 	// --------------------------------------------------------------------------------------------
 
-	private static final class ConnectedComponentsGather
-			extends GatherFunction<Long, NullValue, Long> {
-		@Override
-		public Long gather(Neighbor<Long, NullValue> richEdge) {
+	@SuppressWarnings("serial")
+	private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
 
-			return richEdge.getSrcVertexValue();
+		public Long gather(Neighbor<Long, NullValue> neighbor) {
+			return neighbor.getSrcVertexValue();
 		}
 	};
 
-	private static final class ConnectedComponentsSum
-			extends SumFunction<Long, NullValue, Long> {
-		@Override
-		public Long sum(Long newValue, Long currentValue) {
+	@SuppressWarnings("serial")
+	private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
 
+		public Long sum(Long newValue, Long currentValue) {
 			return Math.min(newValue, currentValue);
 		}
 	};
 
-	private static final class ConnectedComponentsApply
-			extends ApplyFunction<Long, NullValue, Long> {
-		@Override
-		public void apply(Long summedValue, Long origValue) {
+	@SuppressWarnings("serial")
+	private static final class UpdateComponentId extends ApplyFunction<Long, NullValue, Long> {
 
+		public void apply(Long summedValue, Long origValue) {
 			if (summedValue < origValue) {
 				setResult(summedValue);
 			}
@@ -159,14 +145,15 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 		return true;
 	}
 
+	@SuppressWarnings("serial")
 	private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
 		if (fileOutput) {
 			return env.readCsvFile(edgeInputPath)
-					.fieldDelimiter(" ")
+					.fieldDelimiter("\t")
 					.lineDelimiter("\n")
 					.types(Long.class, Long.class)
 					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-						@Override
+
 						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
 							return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
 						}
@@ -186,5 +173,4 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 	public String getDescription() {
 		return "GSA Connected Components";
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/921414d6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
index cc3b054..488c66c 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -19,11 +19,9 @@
 package org.apache.flink.graph.example;
 
 import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
@@ -32,7 +30,7 @@ import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.util.Collector;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 
 /**
  * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
@@ -52,24 +50,13 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
-		DataSet<Vertex<Long, Double>> vertices = edges
-				.flatMap(new InitVerticesMapper(srcVertexId))
-				.distinct();
 
-		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
-
-		// The path from src to trg through edge e costs src + e
-		GatherFunction<Double, Double, Double> gather = new SingleSourceShortestPathGather();
-
-		// Return the smaller path length to minimize distance
-		SumFunction<Double, Double, Double> sum = new SingleSourceShortestPathSum();
-
-		// Iterate as long as the distance is updated
-		ApplyFunction<Double, Double, Double> apply = new SingleSourceShortestPathApply();
+		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
 
 		// Execute the GSA iteration
 		Graph<Long, Double, Double> result = graph
-				.runGatherSumApplyIteration(gather, sum, apply, maxIterations);
+				.runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
+						new UpdateDistance(), maxIterations);
 
 		// Extract the vertices as the result
 		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
@@ -84,27 +71,21 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 		env.execute("GSA Single Source Shortest Paths Example");
 	}
 
-	private static final class InitVerticesMapper
-			implements FlatMapFunction<Edge<Long, Double>, Vertex<Long, Double>>{
+	@SuppressWarnings("serial")
+	private static final class InitVertices implements MapFunction<Long, Double>{
 
-		private long srcVertexId;
+		private long srcId;
 
-		public InitVerticesMapper(long srcId) {
-			this.srcVertexId = srcId;
+		public InitVertices(long srcId) {
+			this.srcId = srcId;
 		}
 
-		@Override
-		public void flatMap(Edge<Long, Double> edge, Collector<Vertex<Long, Double>> out) throws Exception {
-			if (edge.getSource() == srcVertexId) {
-				out.collect(new Vertex<Long, Double>(edge.getSource(), 0.0));
-			} else {
-				out.collect(new Vertex<Long, Double>(edge.getSource(), Double.POSITIVE_INFINITY));
+		public Double map(Long id) {
+			if (id.equals(srcId)) {
+				return 0.0;
 			}
-
-			if (edge.getTarget() == srcVertexId) {
-				out.collect(new Vertex<Long, Double>(edge.getTarget(), 0.0));
-			} else {
-				out.collect(new Vertex<Long, Double>(edge.getTarget(), Double.POSITIVE_INFINITY));
+			else {
+				return Double.MAX_VALUE;
 			}
 		}
 	}
@@ -113,28 +94,28 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 	//  Single Source Shortest Path UDFs
 	// --------------------------------------------------------------------------------------------
 
-	private static final class SingleSourceShortestPathGather
-			extends GatherFunction<Double, Double, Double> {
-		@Override
+	@SuppressWarnings("serial")
+	private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
+
 		public Double gather(Neighbor<Double, Double> richEdge) {
 			return richEdge.getSrcVertexValue() + richEdge.getEdgeValue();
 		}
 	};
 
-	private static final class SingleSourceShortestPathSum
-			extends SumFunction<Double, Double, Double> {
-		@Override
+	@SuppressWarnings("serial")
+	private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
+
 		public Double sum(Double newValue, Double currentValue) {
 			return Math.min(newValue, currentValue);
 		}
 	};
 
-	private static final class SingleSourceShortestPathApply
-			extends ApplyFunction<Double, Double, Double> {
-		@Override
-		public void apply(Double summed, Double target) {
-			if (summed < target) {
-				setResult(summed);
+	@SuppressWarnings("serial")
+	private static final class UpdateDistance extends ApplyFunction<Double, Double, Double> {
+
+		public void apply(Double newDistance, Double oldDistance) {
+			if (newDistance < oldDistance) {
+				setResult(newDistance);
 			}
 		}
 	};
@@ -144,50 +125,47 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 	// --------------------------------------------------------------------------------------------
 
 	private static boolean fileOutput = false;
-	private static String edgeInputPath = null;
+
+	private static Long srcVertexId = 1l;
+
+	private static String edgesInputPath = null;
+
 	private static String outputPath = null;
 
-	private static int maxIterations = 2;
-	private static long srcVertexId = 1;
+	private static int maxIterations = 5;
 
 	private static boolean parseParameters(String[] args) {
 
 		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-
-			if (args.length != 4) {
-				System.err.println("Usage: GSASingleSourceShortestPathsExample <edge path> " +
-						"<result path> <src vertex> <max iterations>");
+			if(args.length != 4) {
+				System.err.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
+						" <input edges path> <output path> <num iterations>");
 				return false;
 			}
 
-			edgeInputPath = args[0];
-			outputPath = args[1];
-			srcVertexId = Long.parseLong(args[2]);
+			fileOutput = true;
+			srcVertexId = Long.parseLong(args[0]);
+			edgesInputPath = args[1];
+			outputPath = args[2];
 			maxIterations = Integer.parseInt(args[3]);
 		} else {
-			System.out.println("Executing GSA Single Source Shortest Paths example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: GSASingleSourceShortestPathsExample <edge path> <result path> <src vertex> "
-					+ "<max iterations>");
+				System.out.println("Executing GSASingle Source Shortest Paths example "
+						+ "with default parameters and built-in default data.");
+				System.out.println("  Provide parameters to read input data from files.");
+				System.out.println("  See the documentation for the correct format of input files.");
+				System.out.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
+						" <input edges path> <output path> <num iterations>");
 		}
 		return true;
 	}
 
 	private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
 		if (fileOutput) {
-			return env.readCsvFile(edgeInputPath)
-					.fieldDelimiter(" ")
+			return env.readCsvFile(edgesInputPath)
+					.fieldDelimiter("\t")
 					.lineDelimiter("\n")
 					.types(Long.class, Long.class, Double.class)
-					.map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
-						@Override
-						public Edge<Long, Double> map(Tuple3<Long, Long, Double> value) throws Exception {
-							return new Edge<Long, Double>(value.f0, value.f1, value.f2);
-						}
-					});
+					.map(new Tuple3ToEdgeMap<Long, Double>());
 		} else {
 			return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
 		}