You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/04/26 14:01:04 UTC

[1/9] flink git commit: [FLINK-1514] [gelly] Unified create and run for GSA iterations

Repository: flink
Updated Branches:
  refs/heads/master b2aafe585 -> 6e24879b9


[FLINK-1514] [gelly] Unified create and run for GSA iterations


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/740d437a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/740d437a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/740d437a

Branch: refs/heads/master
Commit: 740d437a3d07f322d5883b840f21a01090ccea26
Parents: e1f56e9
Author: Dániel Bali <ba...@gmail.com>
Authored: Fri Apr 17 16:28:55 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 19:52:23 2015 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/flink/graph/Graph.java | 30 +++++++-
 .../example/GSAConnectedComponentsExample.java  | 64 ++++++----------
 .../GSASingleSourceShortestPathsExample.java    | 79 ++++++++------------
 .../graph/gsa/GatherSumApplyIteration.java      | 42 -----------
 .../flink/graph/test/GatherSumApplyITCase.java  | 25 ++-----
 5 files changed, 91 insertions(+), 149 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/740d437a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 2a66aca..d564f24 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -43,6 +43,7 @@ import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
@@ -50,11 +51,11 @@ import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.GatherSumApplyIteration;
 import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.spargel.IterationConfiguration;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.graph.utils.EdgeToTuple3Map;
-import org.apache.flink.graph.utils.GraphUtils;
 import org.apache.flink.graph.utils.Tuple2ToVertexMap;
 import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 import org.apache.flink.graph.utils.VertexToTuple2Map;
@@ -351,7 +352,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 
 					@Override
 					public void join(Tuple4<K, K, VV, EV> tripletWithSrcValSet,
-					                 Vertex<K, VV> vertex, Collector<Triplet<K, VV, EV>> collector) throws Exception {
+							Vertex<K, VV> vertex, Collector<Triplet<K, VV, EV>> collector) throws Exception {
 
 						collector.collect(new Triplet<K, VV, EV>(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1,
 								tripletWithSrcValSet.f2, vertex.getValue(), tripletWithSrcValSet.f3));
@@ -1205,6 +1206,31 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
 	}
 
+	/**
+	 * Runs a Gather-Sum-Apply iteration on the graph.
+	 * No configuration options are provided.
+	 *
+	 * @param gatherFunction the gather function collects information about adjacent vertices and edges
+	 * @param sumFunction the sum function aggregates the gathered information
+	 * @param applyFunction the apply function updates the vertex values with the aggregates
+	 * @param maximumNumberOfIterations maximum number of iterations to perform
+	 * @param <M> the intermediate type used between gather, sum and apply
+	 *
+	 * @return the updated Graph after the vertex-centric iteration has converged or
+	 * after maximumNumberOfIterations.
+	 */
+	public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
+			GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
+			ApplyFunction<VV, EV, M> applyFunction, int maximumNumberOfIterations) {
+
+		GatherSumApplyIteration<K, VV, EV, M> iteration = GatherSumApplyIteration.withEdges(
+				edges, gatherFunction, sumFunction, applyFunction, maximumNumberOfIterations);
+
+		DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
+
+		return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
+	}
+
 	public Graph<K, VV, EV> run(GraphAlgorithm<K, VV, EV> algorithm) throws Exception {
 		return algorithm.run(this);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/740d437a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
index 88938b5..d338b03 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -29,14 +29,13 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.GatherSumApplyIteration;
 import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.gsa.RichEdge;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
 /**
- * This is an implementation of the connected components algorithm, using a gather-sum-apply iteration
+ * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration
  */
 public class GSAConnectedComponentsExample implements ProgramDescription {
 
@@ -52,8 +51,8 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
 		DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
+		DataSet<Vertex<Long, Long>> vertices = edges.flatMap(new InitVerticesMapper()).distinct();
 
 		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
 
@@ -67,9 +66,8 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 		ApplyFunction<Long, NullValue, Long> apply = new ConnectedComponentsApply();
 
 		// Execute the GSA iteration
-		GatherSumApplyIteration<Long, Long, NullValue, Long> iteration =
-				graph.createGatherSumApplyIteration(gather, sum, apply, maxIterations);
-		Graph<Long, Long, NullValue> result = graph.runGatherSumApplyIteration(iteration);
+		Graph<Long, Long, NullValue> result =
+				graph.runGatherSumApplyIteration(gather, sum, apply, maxIterations);
 
 		// Extract the vertices as the result
 		DataSet<Vertex<Long, Long>> greedyGraphColoring = result.getVertices();
@@ -84,6 +82,16 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 		env.execute("GSA Connected Components");
 	}
 
+	private static final class InitVerticesMapper
+			implements FlatMapFunction<Edge<Long, NullValue>, Vertex<Long, Long>>{
+
+		@Override
+		public void flatMap(Edge<Long, NullValue> edge, Collector<Vertex<Long, Long>> out) throws Exception {
+			out.collect(new Vertex<Long, Long>(edge.getSource(), edge.getSource()));
+			out.collect(new Vertex<Long, Long>(edge.getTarget(), edge.getTarget()));
+		}
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Connected Components UDFs
 	// --------------------------------------------------------------------------------------------
@@ -122,7 +130,6 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 	// --------------------------------------------------------------------------------------------
 
 	private static boolean fileOutput = false;
-	private static String vertexInputPath = null;
 	private static String edgeInputPath = null;
 	private static String outputPath = null;
 
@@ -130,55 +137,30 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 
 	private static boolean parseParameters(String[] args) {
 
-		if(args.length > 0) {
+		if (args.length > 0) {
 			// parse input arguments
 			fileOutput = true;
 
-			if(args.length != 4) {
-				System.err.println("Usage: GSAConnectedComponentsExample <vertex path> <edge path> " +
+			if (args.length != 3) {
+				System.err.println("Usage: GSAConnectedComponentsExample <edge path> " +
 						"<result path> <max iterations>");
 				return false;
 			}
 
-			vertexInputPath = args[0];
-			edgeInputPath = args[1];
-			outputPath = args[2];
-			maxIterations = Integer.parseInt(args[3]);
+			edgeInputPath = args[0];
+			outputPath = args[1];
+			maxIterations = Integer.parseInt(args[2]);
 		} else {
 			System.out.println("Executing GSA Connected Components example with built-in default data.");
 			System.out.println("  Provide parameters to read input data from files.");
 			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: GSAConnectedComponentsExample <vertex path> <edge path> "
-					+ "<result path> <max iterations>");
+			System.out.println("  Usage: GSAConnectedComponentsExample <edge path> <result path> <max iterations>");
 		}
 		return true;
 	}
 
-	private static DataSet<Vertex<Long, Long>> getVertexDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			return env
-					.readCsvFile(vertexInputPath)
-					.fieldDelimiter(" ")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class)
-					.map(new MapFunction<Tuple2<Long, Long>, Vertex<Long, Long>>() {
-						@Override
-						public Vertex<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
-							return new Vertex<Long, Long>(value.f0, value.f1);
-						}
-					});
-		}
-
-		return env.generateSequence(0, 5).map(new MapFunction<Long, Vertex<Long, Long>>() {
-			@Override
-			public Vertex<Long, Long> map(Long value) throws Exception {
-				return new Vertex<Long, Long>(value, value);
-			}
-		});
-	}
-
 	private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
+		if (fileOutput) {
 			return env.readCsvFile(edgeInputPath)
 					.fieldDelimiter(" ")
 					.lineDelimiter("\n")
@@ -202,7 +184,7 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 
 	@Override
 	public String getDescription() {
-		return "GSA Greedy Graph Coloring";
+		return "GSA Connected Components";
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/740d437a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
index 9c8328b..8967a90 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -19,10 +19,10 @@
 package org.apache.flink.graph.example;
 
 import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
@@ -30,11 +30,9 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
 import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.GatherSumApplyIteration;
 import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.gsa.RichEdge;
-
-import java.io.Serializable;
+import org.apache.flink.util.Collector;
 
 /**
  * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
@@ -53,8 +51,10 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
 		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
+		DataSet<Vertex<Long, Double>> vertices = edges
+				.flatMap(new InitVerticesMapper(srcVertexId))
+				.distinct();
 
 		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
 
@@ -68,10 +68,8 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 		ApplyFunction<Double, Double, Double> apply = new SingleSourceShortestPathApply();
 
 		// Execute the GSA iteration
-		GatherSumApplyIteration<Long, Double, Double, Double> iteration = graph.createGatherSumApplyIteration(
-				gather, sum, apply, maxIterations);
-		Graph<Long, Double, Double> result = graph.mapVertices(new InitVerticesMapper<Long>(srcVertexId))
-				.runGatherSumApplyIteration(iteration);
+		Graph<Long, Double, Double> result = graph
+				.runGatherSumApplyIteration(gather, sum, apply, maxIterations);
 
 		// Extract the vertices as the result
 		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
@@ -86,20 +84,27 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 		env.execute("GSA Single Source Shortest Paths Example");
 	}
 
-	public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
-			implements MapFunction<Vertex<K, Double>, Double> {
+	private static final class InitVerticesMapper
+			implements FlatMapFunction<Edge<Long, Double>, Vertex<Long, Double>>{
 
-		private K srcVertexId;
+		private long srcVertexId;
 
-		public InitVerticesMapper(K srcId) {
+		public InitVerticesMapper(long srcId) {
 			this.srcVertexId = srcId;
 		}
 
-		public Double map(Vertex<K, Double> value) {
-			if (value.f0.equals(srcVertexId)) {
-				return 0.0;
+		@Override
+		public void flatMap(Edge<Long, Double> edge, Collector<Vertex<Long, Double>> out) throws Exception {
+			if (edge.getSource() == srcVertexId) {
+				out.collect(new Vertex<Long, Double>(edge.getSource(), 0.0));
+			} else {
+				out.collect(new Vertex<Long, Double>(edge.getSource(), Double.POSITIVE_INFINITY));
+			}
+
+			if (edge.getTarget() == srcVertexId) {
+				out.collect(new Vertex<Long, Double>(edge.getTarget(), 0.0));
 			} else {
-				return Double.POSITIVE_INFINITY;
+				out.collect(new Vertex<Long, Double>(edge.getTarget(), Double.POSITIVE_INFINITY));
 			}
 		}
 	}
@@ -139,7 +144,6 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 	// --------------------------------------------------------------------------------------------
 
 	private static boolean fileOutput = false;
-	private static String vertexInputPath = null;
 	private static String edgeInputPath = null;
 	private static String outputPath = null;
 
@@ -148,51 +152,32 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 
 	private static boolean parseParameters(String[] args) {
 
-		if(args.length > 0) {
+		if (args.length > 0) {
 			// parse input arguments
 			fileOutput = true;
 
-			if(args.length != 5) {
-				System.err.println("Usage: GSASingleSourceShortestPathsExample <vertex path> <edge path> " +
+			if (args.length != 4) {
+				System.err.println("Usage: GSASingleSourceShortestPathsExample <edge path> " +
 						"<result path> <src vertex> <max iterations>");
 				return false;
 			}
 
-			vertexInputPath = args[0];
-			edgeInputPath = args[1];
-			outputPath = args[2];
-			srcVertexId = Long.parseLong(args[3]);
-			maxIterations = Integer.parseInt(args[4]);
+			edgeInputPath = args[0];
+			outputPath = args[1];
+			srcVertexId = Long.parseLong(args[2]);
+			maxIterations = Integer.parseInt(args[3]);
 		} else {
 			System.out.println("Executing GSA Single Source Shortest Paths example with built-in default data.");
 			System.out.println("  Provide parameters to read input data from files.");
 			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: GSASingleSourceShortestPathsExample <vertex path> <edge path> "
-					+ "<result path> <src vertex> <max iterations>");
+			System.out.println("  Usage: GSASingleSourceShortestPathsExample <edge path> <result path> <src vertex> "
+					+ "<max iterations>");
 		}
 		return true;
 	}
 
-	private static DataSet<Vertex<Long, Double>> getVertexDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			return env
-					.readCsvFile(vertexInputPath)
-					.fieldDelimiter(" ")
-					.lineDelimiter("\n")
-					.types(Long.class, Double.class)
-					.map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() {
-						@Override
-						public Vertex<Long, Double> map(Tuple2<Long, Double> value) throws Exception {
-							return new Vertex<Long, Double>(value.f0, value.f1);
-						}
-					});
-		} else {
-			return SingleSourceShortestPathsData.getDefaultVertexDataSet(env);
-		}
-	}
-
 	private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
+		if (fileOutput) {
 			return env.readCsvFile(edgeInputPath)
 					.fieldDelimiter(" ")
 					.lineDelimiter("\n")

http://git-wip-us.apache.org/repos/asf/flink/blob/740d437a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 426efbb..1adab29 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -57,9 +57,6 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 	private final ApplyFunction<VV, EV, M> apply;
 	private final int maximumNumberOfIterations;
 
-	private String name;
-	private int parallelism = -1;
-
 	// ----------------------------------------------------------------------------------
 
 	private GatherSumApplyIteration(GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum,
@@ -78,45 +75,6 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 		this.maximumNumberOfIterations = maximumNumberOfIterations;
 	}
 
-
-	/**
-	 * Sets the name for the gather-sum-apply iteration. The name is displayed in logs and messages.
-	 *
-	 * @param name The name for the iteration.
-	 */
-	public void setName(String name) {
-		this.name = name;
-	}
-
-	/**
-	 * Gets the name from this gather-sum-apply iteration.
-	 *
-	 * @return The name of the iteration.
-	 */
-	public String getName() {
-		return name;
-	}
-
-	/**
-	 * Sets the degree of parallelism for the iteration.
-	 *
-	 * @param parallelism The degree of parallelism.
-	 */
-	public void setParallelism(int parallelism) {
-		Validate.isTrue(parallelism > 0 || parallelism == -1,
-				"The degree of parallelism must be positive, or -1 (use default).");
-		this.parallelism = parallelism;
-	}
-
-	/**
-	 * Gets the iteration's degree of parallelism.
-	 *
-	 * @return The iterations parallelism, or -1, if not set.
-	 */
-	public int getParallelism() {
-		return parallelism;
-	}
-
 	// --------------------------------------------------------------------------------------------
 	//  Custom Operator behavior
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/740d437a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index 0f5fe47..43377d6 100755
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -40,7 +40,6 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 		super(mode);
 	}
 
-	private String verticesPath;
 	private String edgesPath;
 	private String resultPath;
 	private String expectedResult;
@@ -51,13 +50,10 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 	@Before
 	public void before() throws Exception{
 		resultPath = tempFolder.newFile().toURI().toString();
-		File verticesFile = tempFolder.newFile();
-		Files.write(GatherSumApplyITCase.VERTICES, verticesFile, Charsets.UTF_8);
 
 		File edgesFile = tempFolder.newFile();
 		Files.write(GatherSumApplyITCase.EDGES, edgesFile, Charsets.UTF_8);
 
-		verticesPath = verticesFile.toURI().toString();
 		edgesPath = edgesFile.toURI().toString();
 
 	}
@@ -73,13 +69,14 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 
 	@Test
 	public void testGreedyGraphColoring() throws Exception {
-		GSAConnectedComponentsExample.main(new String[]{verticesPath, edgesPath, resultPath, "16"});
+		GSAConnectedComponentsExample.main(new String[]{edgesPath, resultPath, "16"});
 		expectedResult = "1 1\n" +
 				"2 1\n" +
 				"3 1\n" +
 				"4 1\n" +
 				"5 1\n" +
-				"6 6\n";
+				"6 6\n" +
+				"7 6\n";
 
 	}
 
@@ -89,13 +86,14 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 
 	@Test
 	public void testSingleSourceShortestPath() throws Exception {
-		GSASingleSourceShortestPathsExample.main(new String[]{verticesPath, edgesPath, resultPath, "1", "16"});
+		GSASingleSourceShortestPathsExample.main(new String[]{edgesPath, resultPath, "1", "16"});
 		expectedResult = "1 0.0\n" +
 				"2 12.0\n" +
 				"3 13.0\n" +
 				"4 47.0\n" +
 				"5 48.0\n" +
-				"6 Infinity\n";
+				"6 Infinity\n" +
+				"7 Infinity\n";
 	}
 
 
@@ -103,19 +101,12 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 	//  Sample data
 	// --------------------------------------------------------------------------------------------
 
-	private static final String VERTICES = "1 1\n" +
-			"2 2\n" +
-			"3 3\n" +
-			"4 4\n" +
-			"5 5\n" +
-			"6 6\n";
-
 	private static final String EDGES = "1 2 12.0\n" +
 			"1 3 13.0\n" +
 			"2 3 23.0\n" +
 			"3 4 34.0\n" +
 			"3 5 35.0\n" +
 			"4 5 45.0\n" +
-			"5 1 51.0\n";
-
+			"5 1 51.0\n" +
+			"6 7 67.0\n";
 }


[5/9] flink git commit: [FLINK-1514] [gelly] improvements to the GSA SSSP example; improvements to the GSA Connected Components example

Posted by va...@apache.org.
[FLINK-1514] [gelly] improvements to the GSA SSSP example;
improvements to the GSA Connected Components example


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/921414d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/921414d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/921414d6

Branch: refs/heads/master
Commit: 921414d6e3582cb42820475ef22444b3eea26c96
Parents: 837508d
Author: vasia <va...@apache.org>
Authored: Tue Apr 21 01:06:13 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 20:00:11 2015 +0200

----------------------------------------------------------------------
 .../example/GSAConnectedComponentsExample.java  |  54 ++++-----
 .../GSASingleSourceShortestPathsExample.java    | 118 ++++++++-----------
 2 files changed, 68 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/921414d6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
index 6a2c250..fca7b1d 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -52,22 +52,13 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
-		DataSet<Vertex<Long, Long>> vertices = edges.flatMap(new InitVerticesMapper()).distinct();
 
-		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
-
-		// Simply return the vertex value of each vertex
-		GatherFunction<Long, NullValue, Long> gather = new ConnectedComponentsGather();
-
-		// Select the lower value among neighbors
-		SumFunction<Long, NullValue, Long> sum = new ConnectedComponentsSum();
-
-		// Set the lower value for each vertex
-		ApplyFunction<Long, NullValue, Long> apply = new ConnectedComponentsApply();
+		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
 
 		// Execute the GSA iteration
 		Graph<Long, Long, NullValue> result =
-				graph.runGatherSumApplyIteration(gather, sum, apply, maxIterations);
+				graph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(),
+						new UpdateComponentId(), maxIterations);
 
 		// Extract the vertices as the result
 		DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices();
@@ -82,13 +73,11 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 		env.execute("GSA Connected Components");
 	}
 
-	private static final class InitVerticesMapper
-			implements FlatMapFunction<Edge<Long, NullValue>, Vertex<Long, Long>>{
+	@SuppressWarnings("serial")
+	private static final class InitVertices	implements MapFunction<Long, Long> {
 
-		@Override
-		public void flatMap(Edge<Long, NullValue> edge, Collector<Vertex<Long, Long>> out) throws Exception {
-			out.collect(new Vertex<Long, Long>(edge.getSource(), edge.getSource()));
-			out.collect(new Vertex<Long, Long>(edge.getTarget(), edge.getTarget()));
+		public Long map(Long vertexId) {
+			return vertexId;
 		}
 	}
 
@@ -96,29 +85,26 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 	//  Connected Components UDFs
 	// --------------------------------------------------------------------------------------------
 
-	private static final class ConnectedComponentsGather
-			extends GatherFunction<Long, NullValue, Long> {
-		@Override
-		public Long gather(Neighbor<Long, NullValue> richEdge) {
+	@SuppressWarnings("serial")
+	private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
 
-			return richEdge.getSrcVertexValue();
+		public Long gather(Neighbor<Long, NullValue> neighbor) {
+			return neighbor.getSrcVertexValue();
 		}
 	};
 
-	private static final class ConnectedComponentsSum
-			extends SumFunction<Long, NullValue, Long> {
-		@Override
-		public Long sum(Long newValue, Long currentValue) {
+	@SuppressWarnings("serial")
+	private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
 
+		public Long sum(Long newValue, Long currentValue) {
 			return Math.min(newValue, currentValue);
 		}
 	};
 
-	private static final class ConnectedComponentsApply
-			extends ApplyFunction<Long, NullValue, Long> {
-		@Override
-		public void apply(Long summedValue, Long origValue) {
+	@SuppressWarnings("serial")
+	private static final class UpdateComponentId extends ApplyFunction<Long, NullValue, Long> {
 
+		public void apply(Long summedValue, Long origValue) {
 			if (summedValue < origValue) {
 				setResult(summedValue);
 			}
@@ -159,14 +145,15 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 		return true;
 	}
 
+	@SuppressWarnings("serial")
 	private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
 		if (fileOutput) {
 			return env.readCsvFile(edgeInputPath)
-					.fieldDelimiter(" ")
+					.fieldDelimiter("\t")
 					.lineDelimiter("\n")
 					.types(Long.class, Long.class)
 					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
-						@Override
+
 						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
 							return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
 						}
@@ -186,5 +173,4 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 	public String getDescription() {
 		return "GSA Connected Components";
 	}
-
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/921414d6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
index cc3b054..488c66c 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -19,11 +19,9 @@
 package org.apache.flink.graph.example;
 
 import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
@@ -32,7 +30,7 @@ import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.util.Collector;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 
 /**
  * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
@@ -52,24 +50,13 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
-		DataSet<Vertex<Long, Double>> vertices = edges
-				.flatMap(new InitVerticesMapper(srcVertexId))
-				.distinct();
 
-		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
-
-		// The path from src to trg through edge e costs src + e
-		GatherFunction<Double, Double, Double> gather = new SingleSourceShortestPathGather();
-
-		// Return the smaller path length to minimize distance
-		SumFunction<Double, Double, Double> sum = new SingleSourceShortestPathSum();
-
-		// Iterate as long as the distance is updated
-		ApplyFunction<Double, Double, Double> apply = new SingleSourceShortestPathApply();
+		Graph<Long, Double, Double> graph = Graph.fromDataSet(edges, new InitVertices(srcVertexId), env);
 
 		// Execute the GSA iteration
 		Graph<Long, Double, Double> result = graph
-				.runGatherSumApplyIteration(gather, sum, apply, maxIterations);
+				.runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
+						new UpdateDistance(), maxIterations);
 
 		// Extract the vertices as the result
 		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
@@ -84,27 +71,21 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 		env.execute("GSA Single Source Shortest Paths Example");
 	}
 
-	private static final class InitVerticesMapper
-			implements FlatMapFunction<Edge<Long, Double>, Vertex<Long, Double>>{
+	@SuppressWarnings("serial")
+	private static final class InitVertices implements MapFunction<Long, Double>{
 
-		private long srcVertexId;
+		private long srcId;
 
-		public InitVerticesMapper(long srcId) {
-			this.srcVertexId = srcId;
+		public InitVertices(long srcId) {
+			this.srcId = srcId;
 		}
 
-		@Override
-		public void flatMap(Edge<Long, Double> edge, Collector<Vertex<Long, Double>> out) throws Exception {
-			if (edge.getSource() == srcVertexId) {
-				out.collect(new Vertex<Long, Double>(edge.getSource(), 0.0));
-			} else {
-				out.collect(new Vertex<Long, Double>(edge.getSource(), Double.POSITIVE_INFINITY));
+		public Double map(Long id) {
+			if (id.equals(srcId)) {
+				return 0.0;
 			}
-
-			if (edge.getTarget() == srcVertexId) {
-				out.collect(new Vertex<Long, Double>(edge.getTarget(), 0.0));
-			} else {
-				out.collect(new Vertex<Long, Double>(edge.getTarget(), Double.POSITIVE_INFINITY));
+			else {
+				return Double.MAX_VALUE;
 			}
 		}
 	}
@@ -113,28 +94,28 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 	//  Single Source Shortest Path UDFs
 	// --------------------------------------------------------------------------------------------
 
-	private static final class SingleSourceShortestPathGather
-			extends GatherFunction<Double, Double, Double> {
-		@Override
+	@SuppressWarnings("serial")
+	private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
+
 		public Double gather(Neighbor<Double, Double> richEdge) {
 			return richEdge.getSrcVertexValue() + richEdge.getEdgeValue();
 		}
 	};
 
-	private static final class SingleSourceShortestPathSum
-			extends SumFunction<Double, Double, Double> {
-		@Override
+	@SuppressWarnings("serial")
+	private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
+
 		public Double sum(Double newValue, Double currentValue) {
 			return Math.min(newValue, currentValue);
 		}
 	};
 
-	private static final class SingleSourceShortestPathApply
-			extends ApplyFunction<Double, Double, Double> {
-		@Override
-		public void apply(Double summed, Double target) {
-			if (summed < target) {
-				setResult(summed);
+	@SuppressWarnings("serial")
+	private static final class UpdateDistance extends ApplyFunction<Double, Double, Double> {
+
+		public void apply(Double newDistance, Double oldDistance) {
+			if (newDistance < oldDistance) {
+				setResult(newDistance);
 			}
 		}
 	};
@@ -144,50 +125,47 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 	// --------------------------------------------------------------------------------------------
 
 	private static boolean fileOutput = false;
-	private static String edgeInputPath = null;
+
+	private static Long srcVertexId = 1l;
+
+	private static String edgesInputPath = null;
+
 	private static String outputPath = null;
 
-	private static int maxIterations = 2;
-	private static long srcVertexId = 1;
+	private static int maxIterations = 5;
 
 	private static boolean parseParameters(String[] args) {
 
 		if (args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-
-			if (args.length != 4) {
-				System.err.println("Usage: GSASingleSourceShortestPathsExample <edge path> " +
-						"<result path> <src vertex> <max iterations>");
+			if(args.length != 4) {
+				System.err.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
+						" <input edges path> <output path> <num iterations>");
 				return false;
 			}
 
-			edgeInputPath = args[0];
-			outputPath = args[1];
-			srcVertexId = Long.parseLong(args[2]);
+			fileOutput = true;
+			srcVertexId = Long.parseLong(args[0]);
+			edgesInputPath = args[1];
+			outputPath = args[2];
 			maxIterations = Integer.parseInt(args[3]);
 		} else {
-			System.out.println("Executing GSA Single Source Shortest Paths example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: GSASingleSourceShortestPathsExample <edge path> <result path> <src vertex> "
-					+ "<max iterations>");
+				System.out.println("Executing GSASingle Source Shortest Paths example "
+						+ "with default parameters and built-in default data.");
+				System.out.println("  Provide parameters to read input data from files.");
+				System.out.println("  See the documentation for the correct format of input files.");
+				System.out.println("Usage: GSASingleSourceShortestPaths <source vertex id>" +
+						" <input edges path> <output path> <num iterations>");
 		}
 		return true;
 	}
 
 	private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
 		if (fileOutput) {
-			return env.readCsvFile(edgeInputPath)
-					.fieldDelimiter(" ")
+			return env.readCsvFile(edgesInputPath)
+					.fieldDelimiter("\t")
 					.lineDelimiter("\n")
 					.types(Long.class, Long.class, Double.class)
-					.map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
-						@Override
-						public Edge<Long, Double> map(Tuple3<Long, Long, Double> value) throws Exception {
-							return new Edge<Long, Double>(value.f0, value.f1, value.f2);
-						}
-					});
+					.map(new Tuple3ToEdgeMap<Long, Double>());
 		} else {
 			return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
 		}


[8/9] flink git commit: [FLINK-1514] [gelly] Simplified GatherUdf; added forwarded fields annotations

Posted by va...@apache.org.
[FLINK-1514] [gelly] Simplified GatherUdf; added forwarded fields annotations


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40f5f3a0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40f5f3a0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40f5f3a0

Branch: refs/heads/master
Commit: 40f5f3a0160df708ecc78356853138e640b59da5
Parents: 66d72ac
Author: vasia <va...@apache.org>
Authored: Thu Apr 23 21:56:56 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 20:00:59 2015 +0200

----------------------------------------------------------------------
 .../graph/gsa/GatherSumApplyIteration.java      | 36 +++++++++++++-------
 1 file changed, 23 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40f5f3a0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 992f840..7fcd427 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -19,11 +19,14 @@
 package org.apache.flink.graph.gsa;
 
 import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.RichFlatJoinFunction;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.api.common.functions.RichReduceFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.operators.JoinOperator;
@@ -119,11 +122,9 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 				vertexDataSet.iterateDelta(vertexDataSet, maximumNumberOfIterations, zeroKeyPos);
 
 		// Prepare the neighbors
-		DataSet<Tuple2<Vertex<K, VV>, Edge<K, EV>>> neighbors = iteration
-				.getWorkset()
-				.join(edgeDataSet)
-				.where(0)
-				.equalTo(0);
+		DataSet<Tuple2<K, Neighbor<VV, EV>>> neighbors = iteration
+				.getWorkset().join(edgeDataSet)
+				.where(0).equalTo(0).with(new ProjectKeyWithNeighbor<K, VV, EV>());
 
 		// Gather, sum and apply
 		DataSet<Tuple2<K, M>> gatheredSet = neighbors.map(gatherUdf);
@@ -170,8 +171,9 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 	// --------------------------------------------------------------------------------------------
 
 	@SuppressWarnings("serial")
+	@ForwardedFields("f0")
 	private static final class GatherUdf<K extends Comparable<K> & Serializable, VV extends Serializable,
-			EV extends Serializable, M> extends RichMapFunction<Tuple2<Vertex<K, VV>, Edge<K, EV>>,
+			EV extends Serializable, M> extends RichMapFunction<Tuple2<K, Neighbor<VV, EV>>,
 			Tuple2<K, M>> implements ResultTypeQueryable<Tuple2<K, M>> {
 
 		private final GatherFunction<VV, EV, M> gatherFunction;
@@ -183,13 +185,9 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 		}
 
 		@Override
-		public Tuple2<K, M> map(Tuple2<Vertex<K, VV>, Edge<K, EV>> neighborTuple) throws Exception {
-			Neighbor<VV, EV> neighbor = new Neighbor<VV, EV>(neighborTuple.f0.getValue(),
-					neighborTuple.f1.getValue());
-
-			K key = neighborTuple.f1.getTarget();
-			M result = this.gatherFunction.gather(neighbor);
-			return new Tuple2<K, M>(key, result);
+		public Tuple2<K, M> map(Tuple2<K, Neighbor<VV, EV>> neighborTuple) {
+			M result = this.gatherFunction.gather(neighborTuple.f1);
+			return new Tuple2<K, M>(neighborTuple.f0, result);
 		}
 
 		@Override
@@ -302,4 +300,16 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 		}
 	}
 
+	@SuppressWarnings("serial")
+	@ForwardedFieldsSecond("f1->f0")
+	private static final class ProjectKeyWithNeighbor<K extends Comparable<K> & Serializable,
+			VV extends Serializable, EV extends Serializable> implements FlatJoinFunction<
+			Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
+
+		public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
+			out.collect(new Tuple2<K, Neighbor<VV, EV>>(
+					edge.getTarget(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
+		}
+	}
+
 }


[2/9] flink git commit: [FLINK-1514] [gelly] Removed GGC example, added Connected Components instead

Posted by va...@apache.org.
[FLINK-1514] [gelly] Removed GGC example, added Connected Components instead


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e1f56e9d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e1f56e9d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e1f56e9d

Branch: refs/heads/master
Commit: e1f56e9d767347c5c69f943921cc0cac29708036
Parents: 722719f
Author: Dániel Bali <ba...@gmail.com>
Authored: Fri Mar 20 15:35:53 2015 +0100
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 19:52:23 2015 +0200

----------------------------------------------------------------------
 .../example/GSAConnectedComponentsExample.java  | 208 +++++++++++++++++
 .../example/GSAGreedyGraphColoringExample.java  | 224 -------------------
 .../org/apache/flink/graph/gsa/RichEdge.java    |   3 +-
 .../flink/graph/test/GatherSumApplyITCase.java  |  31 +--
 4 files changed, 227 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e1f56e9d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
new file mode 100755
index 0000000..88938b5
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -0,0 +1,208 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.GatherSumApplyIteration;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.RichEdge;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * This is an implementation of the connected components algorithm, using a gather-sum-apply iteration
+ */
+public class GSAConnectedComponentsExample implements ProgramDescription {
+
+	// --------------------------------------------------------------------------------------------
+	//  Program
+	// --------------------------------------------------------------------------------------------
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Vertex<Long, Long>> vertices = getVertexDataSet(env);
+		DataSet<Edge<Long, NullValue>> edges = getEdgeDataSet(env);
+
+		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(vertices, edges, env);
+
+		// Simply return the vertex value of each vertex
+		GatherFunction<Long, NullValue, Long> gather = new ConnectedComponentsGather();
+
+		// Select the lower value among neighbors
+		SumFunction<Long, NullValue, Long> sum = new ConnectedComponentsSum();
+
+		// Set the lower value for each vertex
+		ApplyFunction<Long, NullValue, Long> apply = new ConnectedComponentsApply();
+
+		// Execute the GSA iteration
+		GatherSumApplyIteration<Long, Long, NullValue, Long> iteration =
+				graph.createGatherSumApplyIteration(gather, sum, apply, maxIterations);
+		Graph<Long, Long, NullValue> result = graph.runGatherSumApplyIteration(iteration);
+
+		// Extract the vertices as the result
+		DataSet<Vertex<Long, Long>> greedyGraphColoring = result.getVertices();
+
+		// emit result
+		if (fileOutput) {
+			greedyGraphColoring.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			greedyGraphColoring.print();
+		}
+
+		env.execute("GSA Connected Components");
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Connected Components UDFs
+	// --------------------------------------------------------------------------------------------
+
+	private static final class ConnectedComponentsGather
+			extends GatherFunction<Long, NullValue, Long> {
+		@Override
+		public Long gather(RichEdge<Long, NullValue> richEdge) {
+
+			return richEdge.getSrcVertexValue();
+		}
+	};
+
+	private static final class ConnectedComponentsSum
+			extends SumFunction<Long, NullValue, Long> {
+		@Override
+		public Long sum(Long newValue, Long currentValue) {
+
+			return Math.min(newValue, currentValue);
+		}
+	};
+
+	private static final class ConnectedComponentsApply
+			extends ApplyFunction<Long, NullValue, Long> {
+		@Override
+		public void apply(Long summedValue, Long origValue) {
+
+			if (summedValue < origValue) {
+				setResult(summedValue);
+			}
+		}
+	};
+
+	// --------------------------------------------------------------------------------------------
+	//  Util methods
+	// --------------------------------------------------------------------------------------------
+
+	private static boolean fileOutput = false;
+	private static String vertexInputPath = null;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
+
+	private static int maxIterations = 16;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+
+			if(args.length != 4) {
+				System.err.println("Usage: GSAConnectedComponentsExample <vertex path> <edge path> " +
+						"<result path> <max iterations>");
+				return false;
+			}
+
+			vertexInputPath = args[0];
+			edgeInputPath = args[1];
+			outputPath = args[2];
+			maxIterations = Integer.parseInt(args[3]);
+		} else {
+			System.out.println("Executing GSA Connected Components example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: GSAConnectedComponentsExample <vertex path> <edge path> "
+					+ "<result path> <max iterations>");
+		}
+		return true;
+	}
+
+	private static DataSet<Vertex<Long, Long>> getVertexDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return env
+					.readCsvFile(vertexInputPath)
+					.fieldDelimiter(" ")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Vertex<Long, Long>>() {
+						@Override
+						public Vertex<Long, Long> map(Tuple2<Long, Long> value) throws Exception {
+							return new Vertex<Long, Long>(value.f0, value.f1);
+						}
+					});
+		}
+
+		return env.generateSequence(0, 5).map(new MapFunction<Long, Vertex<Long, Long>>() {
+			@Override
+			public Vertex<Long, Long> map(Long value) throws Exception {
+				return new Vertex<Long, Long>(value, value);
+			}
+		});
+	}
+
+	private static DataSet<Edge<Long, NullValue>> getEdgeDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.fieldDelimiter(" ")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, NullValue>>() {
+						@Override
+						public Edge<Long, NullValue> map(Tuple2<Long, Long> value) throws Exception {
+							return new Edge<Long, NullValue>(value.f0, value.f1, NullValue.getInstance());
+						}
+					});
+		}
+
+		// Generates 3 components of size 2
+		return env.generateSequence(0, 2).flatMap(new FlatMapFunction<Long, Edge<Long, NullValue>>() {
+			@Override
+			public void flatMap(Long value, Collector<Edge<Long, NullValue>> out) throws Exception {
+				out.collect(new Edge<Long, NullValue>(value, value + 3, NullValue.getInstance()));
+			}
+		});
+	}
+
+	@Override
+	public String getDescription() {
+		return "GSA Greedy Graph Coloring";
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f56e9d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java
deleted file mode 100755
index 4acd86c..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java
+++ /dev/null
@@ -1,224 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.example;
-
-import org.apache.flink.api.common.ProgramDescription;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.GatherSumApplyIteration;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.RichEdge;
-import org.apache.flink.util.Collector;
-
-import java.util.HashSet;
-
-/**
- * This is an implementation of the Greedy Graph Coloring algorithm, using a gather-sum-apply iteration
- */
-public class GSAGreedyGraphColoringExample implements ProgramDescription {
-
-	// --------------------------------------------------------------------------------------------
-	//  Program
-	// --------------------------------------------------------------------------------------------
-
-	public static void main(String[] args) throws Exception {
-
-		if (!parseParameters(args)) {
-			return;
-		}
-
-		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
-		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
-
-		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
-
-		// Gather the target vertices into a one-element set
-		GatherFunction<Double, Double, HashSet<Double>> gather = new GreedyGraphColoringGather();
-
-		// Merge the sets between neighbors
-		SumFunction<Double, Double, HashSet<Double>> sum = new GreedyGraphColoringSum();
-
-		// Find the minimum vertex id in the set which will be propagated
-		ApplyFunction<Double, Double, HashSet<Double>> apply = new GreedyGraphColoringApply();
-
-		// Execute the GSA iteration
-		GatherSumApplyIteration<Long, Double, Double, HashSet<Double>> iteration =
-				graph.createGatherSumApplyIteration(gather, sum, apply, maxIterations);
-		Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration(iteration);
-
-		// Extract the vertices as the result
-		DataSet<Vertex<Long, Double>> greedyGraphColoring = result.getVertices();
-
-		// emit result
-		if (fileOutput) {
-			greedyGraphColoring.writeAsCsv(outputPath, "\n", " ");
-		} else {
-			greedyGraphColoring.print();
-		}
-
-		env.execute("GSA Greedy Graph Coloring");
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Greedy Graph Coloring UDFs
-	// --------------------------------------------------------------------------------------------
-
-	private static final class GreedyGraphColoringGather
-			extends GatherFunction<Double, Double, HashSet<Double>> {
-		@Override
-		public HashSet<Double> gather(RichEdge<Double, Double> richEdge) {
-
-			HashSet<Double> result = new HashSet<Double>();
-			result.add(richEdge.getSrcVertexValue());
-
-			return result;
-		}
-	};
-
-	private static final class GreedyGraphColoringSum
-			extends SumFunction<Double, Double, HashSet<Double>> {
-		@Override
-		public HashSet<Double> sum(HashSet<Double> newValue, HashSet<Double> currentValue) {
-
-			HashSet<Double> result = new HashSet<Double>();
-			result.addAll(newValue);
-			result.addAll(currentValue);
-
-			return result;
-		}
-	};
-
-	private static final class GreedyGraphColoringApply
-			extends ApplyFunction<Double, Double, HashSet<Double>> {
-		@Override
-		public void apply(HashSet<Double> set, Double src) {
-			double minValue = src;
-			for (Double d : set) {
-				if (d < minValue) {
-					minValue = d;
-				}
-			}
-
-			// This is the condition that enables the termination of the iteration
-			if (minValue < src) {
-				setResult(minValue);
-			}
-		}
-	};
-
-	// --------------------------------------------------------------------------------------------
-	//  Util methods
-	// --------------------------------------------------------------------------------------------
-
-	private static boolean fileOutput = false;
-	private static String vertexInputPath = null;
-	private static String edgeInputPath = null;
-	private static String outputPath = null;
-
-	private static int maxIterations = 16;
-
-	private static boolean parseParameters(String[] args) {
-
-		if(args.length > 0) {
-			// parse input arguments
-			fileOutput = true;
-
-			if(args.length != 4) {
-				System.err.println("Usage: GSAGreedyGraphColoringExample <vertex path> <edge path> " +
-						"<result path> <max iterations>");
-				return false;
-			}
-
-			vertexInputPath = args[0];
-			edgeInputPath = args[1];
-			outputPath = args[2];
-			maxIterations = Integer.parseInt(args[3]);
-		} else {
-			System.out.println("Executing GSA Greedy Graph Coloring example with built-in default data.");
-			System.out.println("  Provide parameters to read input data from files.");
-			System.out.println("  See the documentation for the correct format of input files.");
-			System.out.println("  Usage: GSAGreedyGraphColoringExample <vertex path> <edge path> "
-					+ "<result path> <max iterations>");
-		}
-		return true;
-	}
-
-	private static DataSet<Vertex<Long, Double>> getVertexDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			return env
-					.readCsvFile(vertexInputPath)
-					.fieldDelimiter(" ")
-					.lineDelimiter("\n")
-					.types(Long.class, Double.class)
-					.map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() {
-						@Override
-						public Vertex<Long, Double> map(Tuple2<Long, Double> value) throws Exception {
-							return new Vertex<Long, Double>(value.f0, value.f1);
-						}
-					});
-		}
-
-		return env.generateSequence(0, 5).map(new MapFunction<Long, Vertex<Long, Double>>() {
-			@Override
-			public Vertex<Long, Double> map(Long value) throws Exception {
-				return new Vertex<Long, Double>(value, (double) value);
-			}
-		});
-	}
-
-	private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
-		if(fileOutput) {
-			return env.readCsvFile(edgeInputPath)
-					.fieldDelimiter(" ")
-					.lineDelimiter("\n")
-					.types(Long.class, Long.class, Double.class)
-					.map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
-						@Override
-						public Edge<Long, Double> map(Tuple3<Long, Long, Double> value) throws Exception {
-							return new Edge<Long, Double>(value.f0, value.f1, value.f2);
-						}
-					});
-		}
-
-		return env.generateSequence(0, 5).flatMap(new FlatMapFunction<Long, Edge<Long, Double>>() {
-			@Override
-			public void flatMap(Long value, Collector<Edge<Long, Double>> out) throws Exception {
-				out.collect(new Edge<Long, Double>(value, (value + 1) % 6, 0.0));
-				out.collect(new Edge<Long, Double>(value, (value + 2) % 6, 0.0));
-			}
-		});
-	}
-
-	@Override
-	public String getDescription() {
-		return "GSA Greedy Graph Coloring";
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f56e9d/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
index 9befccb..8d4b4d8 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
@@ -23,7 +23,8 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import java.io.Serializable;
 
 /**
- * A wrapper around Tuple3<VV, EV, VV> for convenience in the GatherFunction
+ * This class represents a <sourceVertex, edge> pair
+ * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
  * @param <VV> the vertex value type
  * @param <EV> the edge value type
  */

http://git-wip-us.apache.org/repos/asf/flink/blob/e1f56e9d/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index 321d530..0f5fe47 100755
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -20,7 +20,7 @@ package org.apache.flink.graph.test;
 
 import com.google.common.base.Charsets;
 import com.google.common.io.Files;
-import org.apache.flink.graph.example.GSAGreedyGraphColoringExample;
+import org.apache.flink.graph.example.GSAConnectedComponentsExample;
 import org.apache.flink.graph.example.GSASingleSourceShortestPathsExample;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.junit.After;
@@ -68,17 +68,18 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 	}
 
 	// --------------------------------------------------------------------------------------------
-	//  Greedy Graph Coloring Test
+	//  Connected Components Test
 	// --------------------------------------------------------------------------------------------
 
 	@Test
 	public void testGreedyGraphColoring() throws Exception {
-		GSAGreedyGraphColoringExample.main(new String[] {verticesPath, edgesPath, resultPath, "16"});
-		expectedResult = "1 1.0\n" +
-				"2 1.0\n" +
-				"3 1.0\n" +
-				"4 1.0\n" +
-				"5 1.0\n";
+		GSAConnectedComponentsExample.main(new String[]{verticesPath, edgesPath, resultPath, "16"});
+		expectedResult = "1 1\n" +
+				"2 1\n" +
+				"3 1\n" +
+				"4 1\n" +
+				"5 1\n" +
+				"6 6\n";
 
 	}
 
@@ -93,7 +94,8 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 				"2 12.0\n" +
 				"3 13.0\n" +
 				"4 47.0\n" +
-				"5 48.0\n";
+				"5 48.0\n" +
+				"6 Infinity\n";
 	}
 
 
@@ -101,11 +103,12 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 	//  Sample data
 	// --------------------------------------------------------------------------------------------
 
-	private static final String VERTICES = "1 1.0\n" +
-			"2 2.0\n" +
-			"3 3.0\n" +
-			"4 4.0\n" +
-			"5 5.0\n";
+	private static final String VERTICES = "1 1\n" +
+			"2 2\n" +
+			"3 3\n" +
+			"4 4\n" +
+			"5 5\n" +
+			"6 6\n";
 
 	private static final String EDGES = "1 2 12.0\n" +
 			"1 3 13.0\n" +


[7/9] flink git commit: [FLINK-1514] [gelly] suppressed warnings in several places; adjusted tests

Posted by va...@apache.org.
[FLINK-1514] [gelly] suppressed warnings in several places; adjusted tests


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4e3165ee
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4e3165ee
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4e3165ee

Branch: refs/heads/master
Commit: 4e3165eec508c320f8c4f1fef7afe78697a64fad
Parents: 921414d
Author: vasia <va...@apache.org>
Authored: Tue Apr 21 01:31:50 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 20:00:59 2015 +0200

----------------------------------------------------------------------
 .../GSASingleSourceShortestPathsExample.java    |  2 +-
 .../apache/flink/graph/gsa/ApplyFunction.java   |  2 ++
 .../apache/flink/graph/gsa/GatherFunction.java  |  2 ++
 .../graph/gsa/GatherSumApplyIteration.java      |  3 +++
 .../org/apache/flink/graph/gsa/Neighbor.java    |  1 +
 .../org/apache/flink/graph/gsa/SumFunction.java |  2 ++
 .../flink/graph/test/GatherSumApplyITCase.java  | 20 ++++++++++----------
 7 files changed, 21 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4e3165ee/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
index 488c66c..b4f1f34 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -85,7 +85,7 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 				return 0.0;
 			}
 			else {
-				return Double.MAX_VALUE;
+				return Double.POSITIVE_INFINITY;
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/4e3165ee/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
index d863da1..ab64c92 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -23,6 +23,7 @@ import org.apache.flink.util.Collector;
 
 import java.io.Serializable;
 
+@SuppressWarnings("serial")
 public abstract class ApplyFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
 
 	public abstract void apply(M message, VV vertexValue);
@@ -54,6 +55,7 @@ public abstract class ApplyFunction<VV extends Serializable, EV extends Serializ
 	//  Internal methods
 	// --------------------------------------------------------------------------------------------
 
+	@SuppressWarnings("unused")
 	private IterationRuntimeContext runtimeContext;
 
 	private Collector<VV> out;

http://git-wip-us.apache.org/repos/asf/flink/blob/4e3165ee/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
index 0d110d5..0b0caf9 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext;
 
 import java.io.Serializable;
 
+@SuppressWarnings("serial")
 public abstract class GatherFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
 
 	public abstract M gather(Neighbor<VV, EV> neighbor);
@@ -44,6 +45,7 @@ public abstract class GatherFunction<VV extends Serializable, EV extends Seriali
 	//  Internal methods
 	// --------------------------------------------------------------------------------------------
 
+	@SuppressWarnings("unused")
 	private IterationRuntimeContext runtimeContext;
 
 	public void init(IterationRuntimeContext iterationRuntimeContext) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4e3165ee/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 67ae094..53fa7a4 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -165,6 +165,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 	//  Wrapping UDFs
 	// --------------------------------------------------------------------------------------------
 
+	@SuppressWarnings("serial")
 	private static final class GatherUdf<K extends Comparable<K> & Serializable, VV extends Serializable,
 			EV extends Serializable, M> extends RichMapFunction<Tuple2<Vertex<K, VV>, Edge<K, EV>>,
 			Tuple2<K, M>> implements ResultTypeQueryable<Tuple2<K, M>> {
@@ -206,6 +207,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 		}
 	}
 
+	@SuppressWarnings("serial")
 	private static final class SumUdf<K extends Comparable<K> & Serializable, VV extends Serializable,
 			EV extends Serializable, M> extends RichReduceFunction<Tuple2<K, M>>
 			implements ResultTypeQueryable<Tuple2<K, M>>{
@@ -244,6 +246,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 		}
 	}
 
+	@SuppressWarnings("serial")
 	private static final class ApplyUdf<K extends Comparable<K> & Serializable,
 			VV extends Serializable, EV extends Serializable, M> extends RichFlatJoinFunction<Tuple2<K, M>,
 			Vertex<K, VV>, Vertex<K, VV>> implements ResultTypeQueryable<Vertex<K, VV>> {

http://git-wip-us.apache.org/repos/asf/flink/blob/4e3165ee/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
index 2260022..39f18d5 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
@@ -28,6 +28,7 @@ import java.io.Serializable;
  * @param <VV> the vertex value type
  * @param <EV> the edge value type
  */
+@SuppressWarnings("serial")
 public class Neighbor<VV extends Serializable, EV extends Serializable>
 		extends Tuple2<VV, EV> {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/4e3165ee/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
index b616194..6ed85c4 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.IterationRuntimeContext;
 
 import java.io.Serializable;
 
+@SuppressWarnings("serial")
 public abstract class SumFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
 
 	public abstract M sum(M arg0, M arg1);
@@ -44,6 +45,7 @@ public abstract class SumFunction<VV extends Serializable, EV extends Serializab
 	//  Internal methods
 	// --------------------------------------------------------------------------------------------
 
+	@SuppressWarnings("unused")
 	private IterationRuntimeContext runtimeContext;
 
 	public void init(IterationRuntimeContext iterationRuntimeContext) {

http://git-wip-us.apache.org/repos/asf/flink/blob/4e3165ee/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index 43377d6..23ebfa5 100755
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -68,7 +68,7 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 	// --------------------------------------------------------------------------------------------
 
 	@Test
-	public void testGreedyGraphColoring() throws Exception {
+	public void testConnectedComponents() throws Exception {
 		GSAConnectedComponentsExample.main(new String[]{edgesPath, resultPath, "16"});
 		expectedResult = "1 1\n" +
 				"2 1\n" +
@@ -86,7 +86,7 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 
 	@Test
 	public void testSingleSourceShortestPath() throws Exception {
-		GSASingleSourceShortestPathsExample.main(new String[]{edgesPath, resultPath, "1", "16"});
+		GSASingleSourceShortestPathsExample.main(new String[]{"1", edgesPath, resultPath, "16"});
 		expectedResult = "1 0.0\n" +
 				"2 12.0\n" +
 				"3 13.0\n" +
@@ -101,12 +101,12 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 	//  Sample data
 	// --------------------------------------------------------------------------------------------
 
-	private static final String EDGES = "1 2 12.0\n" +
-			"1 3 13.0\n" +
-			"2 3 23.0\n" +
-			"3 4 34.0\n" +
-			"3 5 35.0\n" +
-			"4 5 45.0\n" +
-			"5 1 51.0\n" +
-			"6 7 67.0\n";
+	private static final String EDGES = "1	2	12.0\n" +
+			"1	3	13.0\n" +
+			"2	3	23.0\n" +
+			"3	4	34.0\n" +
+			"3	5	35.0\n" +
+			"4	5	45.0\n" +
+			"5	1	51.0\n" +
+			"6	7	67.0\n";
 }


[9/9] flink git commit: [FLINK-1514] [gelly] Removed edge value type from ApplyFunction; Reuse the output vertex

Posted by va...@apache.org.
[FLINK-1514] [gelly] Removed edge value type from ApplyFunction; Reuse the output vertex

This closes #408


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6e24879b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6e24879b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6e24879b

Branch: refs/heads/master
Commit: 6e24879b96bcd5eee9f1b4af504eced11a901123
Parents: 40f5f3a
Author: vasia <va...@apache.org>
Authored: Fri Apr 24 00:16:37 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sun Apr 26 13:26:02 2015 +0200

----------------------------------------------------------------------
 flink-staging/flink-gelly/pom.xml               |  0
 .../main/java/org/apache/flink/graph/Graph.java |  4 +--
 .../example/GSAConnectedComponentsExample.java  |  2 +-
 .../GSASingleSourceShortestPathsExample.java    |  2 +-
 .../apache/flink/graph/gsa/ApplyFunction.java   | 16 ++++++----
 .../graph/gsa/GatherSumApplyIteration.java      | 31 ++++++--------------
 6 files changed, 24 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/pom.xml b/flink-staging/flink-gelly/pom.xml
old mode 100755
new mode 100644

http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 330c951..f843827 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -214,7 +214,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 					public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
 						return new Vertex<K, VV>(value.f0, mapper.map(value.f0));
 					}
-				}).returns(returnType);
+				}).returns(returnType).withForwardedFields("f0");
 
 		return new Graph<K, VV, EV>(vertices, edges, context);
 	}
@@ -1213,7 +1213,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 */
 	public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
 			GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
-			ApplyFunction<VV, EV, M> applyFunction, int maximumNumberOfIterations) {
+			ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations) {
 
 		GatherSumApplyIteration<K, VV, EV, M> iteration = GatherSumApplyIteration.withEdges(
 				edges, gatherFunction, sumFunction, applyFunction, maximumNumberOfIterations);

http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
index dba28cd..7c39123 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -102,7 +102,7 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 	};
 
 	@SuppressWarnings("serial")
-	private static final class UpdateComponentId extends ApplyFunction<Long, NullValue, Long> {
+	private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
 
 		public void apply(Long summedValue, Long origValue) {
 			if (summedValue < origValue) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
index 166a3d3..75cbd78 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -111,7 +111,7 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 	};
 
 	@SuppressWarnings("serial")
-	private static final class UpdateDistance extends ApplyFunction<Double, Double, Double> {
+	private static final class UpdateDistance extends ApplyFunction<Long, Double, Double> {
 
 		public void apply(Double newDistance, Double oldDistance) {
 			if (newDistance < oldDistance) {

http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
index ab64c92..75f64f9 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -19,14 +19,16 @@
 package org.apache.flink.graph.gsa;
 
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.util.Collector;
 
 import java.io.Serializable;
 
 @SuppressWarnings("serial")
-public abstract class ApplyFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
+public abstract class ApplyFunction<K extends Comparable<K> & Serializable, VV extends Serializable, M>
+	implements Serializable {
 
-	public abstract void apply(M message, VV vertexValue);
+	public abstract void apply(M newValue, VV currentValue);
 
 	/**
 	 * Sets the result for the apply function
@@ -34,7 +36,8 @@ public abstract class ApplyFunction<VV extends Serializable, EV extends Serializ
 	 * @param result the result of the apply phase
 	 */
 	public void setResult(VV result) {
-		out.collect(result);
+		outVal.f1 = result;
+		out.collect(outVal);
 	}
 
 	/**
@@ -58,14 +61,17 @@ public abstract class ApplyFunction<VV extends Serializable, EV extends Serializ
 	@SuppressWarnings("unused")
 	private IterationRuntimeContext runtimeContext;
 
-	private Collector<VV> out;
+	private Collector<Vertex<K, VV>> out;
+
+	private Vertex<K, VV> outVal;
 
 	public void init(IterationRuntimeContext iterationRuntimeContext) {
 		this.runtimeContext = iterationRuntimeContext;
 	};
 
-	public void setOutput(Collector<VV> out) {
+	public void setOutput(Vertex<K, VV> vertex, Collector<Vertex<K, VV>> out) {
 		this.out = out;
+		this.outVal = vertex;
 	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/6e24879b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 7fcd427..22be591 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -58,13 +58,13 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 
 	private final GatherFunction<VV, EV, M> gather;
 	private final SumFunction<VV, EV, M> sum;
-	private final ApplyFunction<VV, EV, M> apply;
+	private final ApplyFunction<K, VV, M> apply;
 	private final int maximumNumberOfIterations;
 
 	// ----------------------------------------------------------------------------------
 
 	private GatherSumApplyIteration(GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum,
-			ApplyFunction<VV, EV, M> apply, DataSet<Edge<K, EV>> edges, int maximumNumberOfIterations) {
+			ApplyFunction<K, VV, M> apply, DataSet<Edge<K, EV>> edges, int maximumNumberOfIterations) {
 
 		Validate.notNull(gather);
 		Validate.notNull(sum);
@@ -161,7 +161,7 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 	 */
 	public static final <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, M>
 			GatherSumApplyIteration<K, VV, EV, M> withEdges(DataSet<Edge<K, EV>> edges,
-			GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum, ApplyFunction<VV, EV, M> apply,
+			GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply,
 			int maximumNumberOfIterations) {
 		return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations);
 	}
@@ -253,32 +253,19 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 			VV extends Serializable, EV extends Serializable, M> extends RichFlatJoinFunction<Tuple2<K, M>,
 			Vertex<K, VV>, Vertex<K, VV>> implements ResultTypeQueryable<Vertex<K, VV>> {
 
-		private final ApplyFunction<VV, EV, M> applyFunction;
+		private final ApplyFunction<K, VV, M> applyFunction;
 		private transient TypeInformation<Vertex<K, VV>> resultType;
 
-		private ApplyUdf(ApplyFunction<VV, EV, M> applyFunction, TypeInformation<Vertex<K, VV>> resultType) {
+		private ApplyUdf(ApplyFunction<K, VV, M> applyFunction, TypeInformation<Vertex<K, VV>> resultType) {
 			this.applyFunction = applyFunction;
 			this.resultType = resultType;
 		}
 
 		@Override
-		public void join(Tuple2<K, M> arg0, Vertex<K, VV> arg1, final Collector<Vertex<K, VV>> out) throws Exception {
-
-			final K key = arg1.getId();
-			Collector<VV> userOut = new Collector<VV>() {
-				@Override
-				public void collect(VV record) {
-					out.collect(new Vertex<K, VV>(key, record));
-				}
-
-				@Override
-				public void close() {
-					out.close();
-				}
-			};
-
-			this.applyFunction.setOutput(userOut);
-			this.applyFunction.apply(arg0.f1, arg1.getValue());
+		public void join(Tuple2<K, M> newValue, final Vertex<K, VV> currentValue, final Collector<Vertex<K, VV>> out) throws Exception {
+
+			this.applyFunction.setOutput(currentValue, out);
+			this.applyFunction.apply(newValue.f1, currentValue.getValue());
 		}
 
 		@Override


[3/9] flink git commit: [FLINK-1514] [gelly] Add a Gather-Sum-Apply iteration method

Posted by va...@apache.org.
[FLINK-1514] [gelly] Add a Gather-Sum-Apply iteration method


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/722719f2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/722719f2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/722719f2

Branch: refs/heads/master
Commit: 722719f2a2a357962b13beffc8f52ca3031e9926
Parents: b2aafe5
Author: Dániel Bali <ba...@gmail.com>
Authored: Mon Feb 16 15:06:05 2015 +0100
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 19:52:23 2015 +0200

----------------------------------------------------------------------
 flink-staging/flink-gelly/pom.xml               |  31 ++
 .../main/java/org/apache/flink/graph/Graph.java |  25 +-
 .../example/GSAGreedyGraphColoringExample.java  | 224 ++++++++++++
 .../GSASingleSourceShortestPathsExample.java    | 215 ++++++++++++
 .../apache/flink/graph/gsa/ApplyFunction.java   |  69 ++++
 .../apache/flink/graph/gsa/GatherFunction.java  |  52 +++
 .../graph/gsa/GatherSumApplyIteration.java      | 340 +++++++++++++++++++
 .../org/apache/flink/graph/gsa/RichEdge.java    |  46 +++
 .../org/apache/flink/graph/gsa/SumFunction.java |  52 +++
 .../flink/graph/test/GatherSumApplyITCase.java  | 118 +++++++
 10 files changed, 1165 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/pom.xml b/flink-staging/flink-gelly/pom.xml
index a36ab4b..5245667 100644
--- a/flink-staging/flink-gelly/pom.xml
+++ b/flink-staging/flink-gelly/pom.xml
@@ -57,4 +57,35 @@ under the License.
 			<version>${guava.version}</version>
 		</dependency>
 	</dependencies>
+
+    <!-- See main pom.xml for explanation of profiles -->
+    <profiles>
+        <profile>
+            <id>hadoop-1</id>
+            <activation>
+                <property>
+                    <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
+                    <!--hadoop1--><name>hadoop.profile</name><value>1</value>
+                </property>
+            </activation>
+            <dependencies>
+                <!-- Add this here, for hadoop-2 we don't need it since we get guava transitively -->
+                <dependency>
+                    <groupId>com.google.guava</groupId>
+                    <artifactId>guava</artifactId>
+                    <version>${guava.version}</version>
+                    <scope>provided</scope>
+                </dependency>
+            </dependencies>
+        </profile>
+        <profile>
+            <id>hadoop-2</id>
+            <activation>
+                <property>
+                    <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
+                    <!--hadoop2--><name>!hadoop.profile</name>
+                </property>
+            </activation>
+        </profile>
+    </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
old mode 100644
new mode 100755
index 62173e3..2a66aca
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -43,15 +43,18 @@ import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.graph.spargel.IterationConfiguration;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.GatherSumApplyIteration;
+import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.graph.utils.EdgeToTuple3Map;
+import org.apache.flink.graph.utils.GraphUtils;
 import org.apache.flink.graph.utils.Tuple2ToVertexMap;
 import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 import org.apache.flink.graph.utils.VertexToTuple2Map;
@@ -79,7 +82,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	private final DataSet<Edge<K, EV>> edges;
 
 	/**
-	 * Creates a graph from two DataSets: vertices and edges
+	 * Creates a graph from two DataSets: vertices and edges and allow setting
+	 * the undirected property
 	 * 
 	 * @param vertices a DataSet of vertices.
 	 * @param edges a DataSet of edges.
@@ -347,7 +351,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 
 					@Override
 					public void join(Tuple4<K, K, VV, EV> tripletWithSrcValSet,
-									Vertex<K, VV> vertex, Collector<Triplet<K, VV, EV>> collector) throws Exception {
+					                 Vertex<K, VV> vertex, Collector<Triplet<K, VV, EV>> collector) throws Exception {
 
 						collector.collect(new Triplet<K, VV, EV>(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1,
 								tripletWithSrcValSet.f2, vertex.getValue(), tripletWithSrcValSet.f3));
@@ -914,7 +918,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	}
 
 	/**
-	 * @return a long integer representing the number of edges
+	 * @return Singleton DataSet containing the edge count
 	 */
 	public long numberOfEdges() throws Exception {
 		return edges.count();
@@ -1011,6 +1015,13 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
+	private static final class CheckIfOneComponentMapper implements	MapFunction<Integer, Boolean> {
+		@Override
+		public Boolean map(Integer n) {
+			return (n == 1);
+		}
+	}
+
 	/**
 	 * Adds the input vertex and edges to the graph. If the vertex already
 	 * exists in the graph, it will not be added again, but the given edges
@@ -1165,7 +1176,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 			int maximumNumberOfIterations) {
 
 		return this.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
-			maximumNumberOfIterations, null);
+				maximumNumberOfIterations, null);
 	}
 
 	/**
@@ -1397,4 +1408,4 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 			return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class,	function.getClass(), 3, null, null);
 		}
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java
new file mode 100755
index 0000000..4acd86c
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAGreedyGraphColoringExample.java
@@ -0,0 +1,224 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.GatherSumApplyIteration;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.RichEdge;
+import org.apache.flink.util.Collector;
+
+import java.util.HashSet;
+
+/**
+ * This is an implementation of the Greedy Graph Coloring algorithm, using a gather-sum-apply iteration
+ */
+public class GSAGreedyGraphColoringExample implements ProgramDescription {
+
+	// --------------------------------------------------------------------------------------------
+	//  Program
+	// --------------------------------------------------------------------------------------------
+
+	public static void main(String[] args) throws Exception {
+
+		if (!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
+		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
+
+		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
+
+		// Gather the target vertices into a one-element set
+		GatherFunction<Double, Double, HashSet<Double>> gather = new GreedyGraphColoringGather();
+
+		// Merge the sets between neighbors
+		SumFunction<Double, Double, HashSet<Double>> sum = new GreedyGraphColoringSum();
+
+		// Find the minimum vertex id in the set which will be propagated
+		ApplyFunction<Double, Double, HashSet<Double>> apply = new GreedyGraphColoringApply();
+
+		// Execute the GSA iteration
+		GatherSumApplyIteration<Long, Double, Double, HashSet<Double>> iteration =
+				graph.createGatherSumApplyIteration(gather, sum, apply, maxIterations);
+		Graph<Long, Double, Double> result = graph.runGatherSumApplyIteration(iteration);
+
+		// Extract the vertices as the result
+		DataSet<Vertex<Long, Double>> greedyGraphColoring = result.getVertices();
+
+		// emit result
+		if (fileOutput) {
+			greedyGraphColoring.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			greedyGraphColoring.print();
+		}
+
+		env.execute("GSA Greedy Graph Coloring");
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Greedy Graph Coloring UDFs
+	// --------------------------------------------------------------------------------------------
+
+	private static final class GreedyGraphColoringGather
+			extends GatherFunction<Double, Double, HashSet<Double>> {
+		@Override
+		public HashSet<Double> gather(RichEdge<Double, Double> richEdge) {
+
+			HashSet<Double> result = new HashSet<Double>();
+			result.add(richEdge.getSrcVertexValue());
+
+			return result;
+		}
+	};
+
+	private static final class GreedyGraphColoringSum
+			extends SumFunction<Double, Double, HashSet<Double>> {
+		@Override
+		public HashSet<Double> sum(HashSet<Double> newValue, HashSet<Double> currentValue) {
+
+			HashSet<Double> result = new HashSet<Double>();
+			result.addAll(newValue);
+			result.addAll(currentValue);
+
+			return result;
+		}
+	};
+
+	private static final class GreedyGraphColoringApply
+			extends ApplyFunction<Double, Double, HashSet<Double>> {
+		@Override
+		public void apply(HashSet<Double> set, Double src) {
+			double minValue = src;
+			for (Double d : set) {
+				if (d < minValue) {
+					minValue = d;
+				}
+			}
+
+			// This is the condition that enables the termination of the iteration
+			if (minValue < src) {
+				setResult(minValue);
+			}
+		}
+	};
+
+	// --------------------------------------------------------------------------------------------
+	//  Util methods
+	// --------------------------------------------------------------------------------------------
+
+	private static boolean fileOutput = false;
+	private static String vertexInputPath = null;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
+
+	private static int maxIterations = 16;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+
+			if(args.length != 4) {
+				System.err.println("Usage: GSAGreedyGraphColoringExample <vertex path> <edge path> " +
+						"<result path> <max iterations>");
+				return false;
+			}
+
+			vertexInputPath = args[0];
+			edgeInputPath = args[1];
+			outputPath = args[2];
+			maxIterations = Integer.parseInt(args[3]);
+		} else {
+			System.out.println("Executing GSA Greedy Graph Coloring example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: GSAGreedyGraphColoringExample <vertex path> <edge path> "
+					+ "<result path> <max iterations>");
+		}
+		return true;
+	}
+
+	private static DataSet<Vertex<Long, Double>> getVertexDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return env
+					.readCsvFile(vertexInputPath)
+					.fieldDelimiter(" ")
+					.lineDelimiter("\n")
+					.types(Long.class, Double.class)
+					.map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() {
+						@Override
+						public Vertex<Long, Double> map(Tuple2<Long, Double> value) throws Exception {
+							return new Vertex<Long, Double>(value.f0, value.f1);
+						}
+					});
+		}
+
+		return env.generateSequence(0, 5).map(new MapFunction<Long, Vertex<Long, Double>>() {
+			@Override
+			public Vertex<Long, Double> map(Long value) throws Exception {
+				return new Vertex<Long, Double>(value, (double) value);
+			}
+		});
+	}
+
+	private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.fieldDelimiter(" ")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class, Double.class)
+					.map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
+						@Override
+						public Edge<Long, Double> map(Tuple3<Long, Long, Double> value) throws Exception {
+							return new Edge<Long, Double>(value.f0, value.f1, value.f2);
+						}
+					});
+		}
+
+		return env.generateSequence(0, 5).flatMap(new FlatMapFunction<Long, Edge<Long, Double>>() {
+			@Override
+			public void flatMap(Long value, Collector<Edge<Long, Double>> out) throws Exception {
+				out.collect(new Edge<Long, Double>(value, (value + 1) % 6, 0.0));
+				out.collect(new Edge<Long, Double>(value, (value + 2) % 6, 0.0));
+			}
+		});
+	}
+
+	@Override
+	public String getDescription() {
+		return "GSA Greedy Graph Coloring";
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
new file mode 100755
index 0000000..9c8328b
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.GatherSumApplyIteration;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.RichEdge;
+
+import java.io.Serializable;
+
+/**
+ * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
+ */
+public class GSASingleSourceShortestPathsExample implements ProgramDescription {
+
+	// --------------------------------------------------------------------------------------------
+	//  Program
+	// --------------------------------------------------------------------------------------------
+
+	public static void main(String[] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Vertex<Long, Double>> vertices = getVertexDataSet(env);
+		DataSet<Edge<Long, Double>> edges = getEdgeDataSet(env);
+
+		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
+
+		// The path from src to trg through edge e costs src + e
+		GatherFunction<Double, Double, Double> gather = new SingleSourceShortestPathGather();
+
+		// Return the smaller path length to minimize distance
+		SumFunction<Double, Double, Double> sum = new SingleSourceShortestPathSum();
+
+		// Iterate as long as the distance is updated
+		ApplyFunction<Double, Double, Double> apply = new SingleSourceShortestPathApply();
+
+		// Execute the GSA iteration
+		GatherSumApplyIteration<Long, Double, Double, Double> iteration = graph.createGatherSumApplyIteration(
+				gather, sum, apply, maxIterations);
+		Graph<Long, Double, Double> result = graph.mapVertices(new InitVerticesMapper<Long>(srcVertexId))
+				.runGatherSumApplyIteration(iteration);
+
+		// Extract the vertices as the result
+		DataSet<Vertex<Long, Double>> singleSourceShortestPaths = result.getVertices();
+
+		// emit result
+		if(fileOutput) {
+			singleSourceShortestPaths.writeAsCsv(outputPath, "\n", " ");
+		} else {
+			singleSourceShortestPaths.print();
+		}
+
+		env.execute("GSA Single Source Shortest Paths Example");
+	}
+
+	public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>
+			implements MapFunction<Vertex<K, Double>, Double> {
+
+		private K srcVertexId;
+
+		public InitVerticesMapper(K srcId) {
+			this.srcVertexId = srcId;
+		}
+
+		public Double map(Vertex<K, Double> value) {
+			if (value.f0.equals(srcVertexId)) {
+				return 0.0;
+			} else {
+				return Double.POSITIVE_INFINITY;
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Single Source Shortest Path UDFs
+	// --------------------------------------------------------------------------------------------
+
+	private static final class SingleSourceShortestPathGather
+			extends GatherFunction<Double, Double, Double> {
+		@Override
+		public Double gather(RichEdge<Double, Double> richEdge) {
+			return richEdge.getSrcVertexValue() + richEdge.getEdgeValue();
+		}
+	};
+
+	private static final class SingleSourceShortestPathSum
+			extends SumFunction<Double, Double, Double> {
+		@Override
+		public Double sum(Double newValue, Double currentValue) {
+			return Math.min(newValue, currentValue);
+		}
+	};
+
+	private static final class SingleSourceShortestPathApply
+			extends ApplyFunction<Double, Double, Double> {
+		@Override
+		public void apply(Double summed, Double target) {
+			if (summed < target) {
+				setResult(summed);
+			}
+		}
+	};
+
+	// --------------------------------------------------------------------------------------------
+	//  Util methods
+	// --------------------------------------------------------------------------------------------
+
+	private static boolean fileOutput = false;
+	private static String vertexInputPath = null;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
+
+	private static int maxIterations = 2;
+	private static long srcVertexId = 1;
+
+	private static boolean parseParameters(String[] args) {
+
+		if(args.length > 0) {
+			// parse input arguments
+			fileOutput = true;
+
+			if(args.length != 5) {
+				System.err.println("Usage: GSASingleSourceShortestPathsExample <vertex path> <edge path> " +
+						"<result path> <src vertex> <max iterations>");
+				return false;
+			}
+
+			vertexInputPath = args[0];
+			edgeInputPath = args[1];
+			outputPath = args[2];
+			srcVertexId = Long.parseLong(args[3]);
+			maxIterations = Integer.parseInt(args[4]);
+		} else {
+			System.out.println("Executing GSA Single Source Shortest Paths example with built-in default data.");
+			System.out.println("  Provide parameters to read input data from files.");
+			System.out.println("  See the documentation for the correct format of input files.");
+			System.out.println("  Usage: GSASingleSourceShortestPathsExample <vertex path> <edge path> "
+					+ "<result path> <src vertex> <max iterations>");
+		}
+		return true;
+	}
+
+	private static DataSet<Vertex<Long, Double>> getVertexDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return env
+					.readCsvFile(vertexInputPath)
+					.fieldDelimiter(" ")
+					.lineDelimiter("\n")
+					.types(Long.class, Double.class)
+					.map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() {
+						@Override
+						public Vertex<Long, Double> map(Tuple2<Long, Double> value) throws Exception {
+							return new Vertex<Long, Double>(value.f0, value.f1);
+						}
+					});
+		} else {
+			return SingleSourceShortestPathsData.getDefaultVertexDataSet(env);
+		}
+	}
+
+	private static DataSet<Edge<Long, Double>> getEdgeDataSet(ExecutionEnvironment env) {
+		if(fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.fieldDelimiter(" ")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class, Double.class)
+					.map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
+						@Override
+						public Edge<Long, Double> map(Tuple3<Long, Long, Double> value) throws Exception {
+							return new Edge<Long, Double>(value.f0, value.f1, value.f2);
+						}
+					});
+		} else {
+			return SingleSourceShortestPathsData.getDefaultEdgeDataSet(env);
+		}
+	}
+
+	@Override
+	public String getDescription() {
+		return "GSA Single Source Shortest Paths";
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
new file mode 100755
index 0000000..d863da1
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+public abstract class ApplyFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
+
+	public abstract void apply(M message, VV vertexValue);
+
+	/**
+	 * Sets the result for the apply function
+	 *
+	 * @param result the result of the apply phase
+	 */
+	public void setResult(VV result) {
+		out.collect(result);
+	}
+
+	/**
+	 * This method is executed once per superstep before the vertex update function is invoked for each vertex.
+	 *
+	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+	 */
+	public void preSuperstep() {};
+
+	/**
+	 * This method is executed once per superstep after the vertex update function has been invoked for each vertex.
+	 *
+	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+	 */
+	public void postSuperstep() {};
+
+	// --------------------------------------------------------------------------------------------
+	//  Internal methods
+	// --------------------------------------------------------------------------------------------
+
+	private IterationRuntimeContext runtimeContext;
+
+	private Collector<VV> out;
+
+	public void init(IterationRuntimeContext iterationRuntimeContext) {
+		this.runtimeContext = iterationRuntimeContext;
+	};
+
+	public void setOutput(Collector<VV> out) {
+		this.out = out;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
new file mode 100755
index 0000000..91a468d
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+
+import java.io.Serializable;
+
+public abstract class GatherFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
+
+	public abstract M gather(RichEdge<VV, EV> richEdge);
+
+	/**
+	 * This method is executed once per superstep before the vertex update function is invoked for each vertex.
+	 *
+	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+	 */
+	public void preSuperstep() {};
+
+	/**
+	 * This method is executed once per superstep after the vertex update function has been invoked for each vertex.
+	 *
+	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+	 */
+	public void postSuperstep() {};
+
+	// --------------------------------------------------------------------------------------------
+	//  Internal methods
+	// --------------------------------------------------------------------------------------------
+
+	private IterationRuntimeContext runtimeContext;
+
+	public void init(IterationRuntimeContext iterationRuntimeContext) {
+		this.runtimeContext = iterationRuntimeContext;
+	};
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
new file mode 100755
index 0000000..426efbb
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -0,0 +1,340 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.functions.RichFlatJoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.CustomUnaryOperation;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.Collector;
+
+import java.io.Serializable;
+
+/**
+ * This class represents iterative graph computations, programmed in a gather-sum-apply perspective.
+ *
+ * @param <K> The type of the vertex key in the graph
+ * @param <VV> The type of the vertex value in the graph
+ * @param <EV> The type of the edge value in the graph
+ * @param <M> The intermediate type used by the gather, sum and apply functions
+ */
+public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
+		VV extends Serializable, EV extends Serializable, M> implements CustomUnaryOperation<Vertex<K, VV>,
+		Vertex<K, VV>> {
+
+	private DataSet<Vertex<K, VV>> vertexDataSet;
+	private DataSet<Edge<K, EV>> edgeDataSet;
+
+	private final GatherFunction<VV, EV, M> gather;
+	private final SumFunction<VV, EV, M> sum;
+	private final ApplyFunction<VV, EV, M> apply;
+	private final int maximumNumberOfIterations;
+
+	private String name;
+	private int parallelism = -1;
+
+	// ----------------------------------------------------------------------------------
+
+	private GatherSumApplyIteration(GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum,
+			ApplyFunction<VV, EV, M> apply, DataSet<Edge<K, EV>> edges, int maximumNumberOfIterations) {
+
+		Validate.notNull(gather);
+		Validate.notNull(sum);
+		Validate.notNull(apply);
+		Validate.notNull(edges);
+		Validate.isTrue(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
+
+		this.gather = gather;
+		this.sum = sum;
+		this.apply = apply;
+		this.edgeDataSet = edges;
+		this.maximumNumberOfIterations = maximumNumberOfIterations;
+	}
+
+
+	/**
+	 * Sets the name for the gather-sum-apply iteration. The name is displayed in logs and messages.
+	 *
+	 * @param name The name for the iteration.
+	 */
+	public void setName(String name) {
+		this.name = name;
+	}
+
+	/**
+	 * Gets the name from this gather-sum-apply iteration.
+	 *
+	 * @return The name of the iteration.
+	 */
+	public String getName() {
+		return name;
+	}
+
+	/**
+	 * Sets the degree of parallelism for the iteration.
+	 *
+	 * @param parallelism The degree of parallelism.
+	 */
+	public void setParallelism(int parallelism) {
+		Validate.isTrue(parallelism > 0 || parallelism == -1,
+				"The degree of parallelism must be positive, or -1 (use default).");
+		this.parallelism = parallelism;
+	}
+
+	/**
+	 * Gets the iteration's degree of parallelism.
+	 *
+	 * @return The iterations parallelism, or -1, if not set.
+	 */
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Custom Operator behavior
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Sets the input data set for this operator. In the case of this operator this input data set represents
+	 * the set of vertices with their initial state.
+	 *
+	 * @param dataSet The input data set, which in the case of this operator represents the set of
+	 *                vertices with their initial state.
+	 */
+	@Override
+	public void setInput(DataSet<Vertex<K, VV>> dataSet) {
+		this.vertexDataSet = dataSet;
+	}
+
+	/**
+	 * Computes the results of the gather-sum-apply iteration
+	 *
+	 * @return The resulting DataSet
+	 */
+	@Override
+	public DataSet<Vertex<K, VV>> createResult() {
+		if (vertexDataSet == null) {
+			throw new IllegalStateException("The input data set has not been set.");
+		}
+
+		// Prepare type information
+		TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertexDataSet.getType()).getTypeAt(0);
+		TypeInformation<M> messageType = TypeExtractor.createTypeInfo(GatherFunction.class, gather.getClass(), 2, null, null);
+		TypeInformation<Tuple2<K, M>> innerType = new TupleTypeInfo<Tuple2<K, M>>(keyType, messageType);
+		TypeInformation<Vertex<K, VV>> outputType = vertexDataSet.getType();
+
+		// Prepare UDFs
+		GatherUdf<K, VV, EV, M> gatherUdf = new GatherUdf<K, VV, EV, M>(gather, innerType);
+		SumUdf<K, VV, EV, M> sumUdf = new SumUdf<K, VV, EV, M>(sum, innerType);
+		ApplyUdf<K, VV, EV, M> applyUdf = new ApplyUdf<K, VV, EV, M>(apply, outputType);
+
+		final int[] zeroKeyPos = new int[] {0};
+		final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
+				vertexDataSet.iterateDelta(vertexDataSet, maximumNumberOfIterations, zeroKeyPos);
+
+		// Prepare the rich edges
+		DataSet<Tuple2<Vertex<K, VV>, Edge<K, EV>>> richEdges = iteration
+				.getWorkset()
+				.join(edgeDataSet)
+				.where(0)
+				.equalTo(0);
+
+		// Gather, sum and apply
+		DataSet<Tuple2<K, M>> gatheredSet = richEdges.map(gatherUdf);
+		DataSet<Tuple2<K, M>> summedSet = gatheredSet.groupBy(0).reduce(sumUdf);
+		DataSet<Vertex<K, VV>> appliedSet = summedSet
+				.join(iteration.getSolutionSet())
+				.where(0)
+				.equalTo(0)
+				.with(applyUdf);
+
+		return iteration.closeWith(appliedSet, appliedSet);
+	}
+
+	/**
+	 * Creates a new gather-sum-apply iteration operator for graphs
+	 *
+	 * @param edges The edge DataSet
+	 *
+	 * @param gather The gather function of the GSA iteration
+	 * @param sum The sum function of the GSA iteration
+	 * @param apply The apply function of the GSA iteration
+	 *
+	 * @param maximumNumberOfIterations The maximum number of iterations executed
+	 *
+	 * @param <K> The type of the vertex key in the graph
+	 * @param <VV> The type of the vertex value in the graph
+	 * @param <EV> The type of the edge value in the graph
+	 * @param <M> The intermediate type used by the gather, sum and apply functions
+	 *
+	 * @return An in stance of the gather-sum-apply graph computation operator.
+	 */
+	public static final <K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable, M>
+			GatherSumApplyIteration<K, VV, EV, M> withEdges(DataSet<Edge<K, EV>> edges,
+			GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum, ApplyFunction<VV, EV, M> apply,
+			int maximumNumberOfIterations) {
+		return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Wrapping UDFs
+	// --------------------------------------------------------------------------------------------
+
+	private static final class GatherUdf<K extends Comparable<K> & Serializable, VV extends Serializable,
+			EV extends Serializable, M> extends RichMapFunction<Tuple2<Vertex<K, VV>, Edge<K, EV>>,
+			Tuple2<K, M>> implements ResultTypeQueryable<Tuple2<K, M>> {
+
+		private final GatherFunction<VV, EV, M> gatherFunction;
+		private transient TypeInformation<Tuple2<K, M>> resultType;
+
+		private GatherUdf(GatherFunction<VV, EV, M> gatherFunction, TypeInformation<Tuple2<K, M>> resultType) {
+			this.gatherFunction = gatherFunction;
+			this.resultType = resultType;
+		}
+
+		@Override
+		public Tuple2<K, M> map(Tuple2<Vertex<K, VV>, Edge<K, EV>> richEdge) throws Exception {
+			RichEdge<VV, EV> userRichEdge = new RichEdge<VV, EV>(richEdge.f0.getValue(),
+					richEdge.f1.getValue());
+
+			K key = richEdge.f1.getTarget();
+			M result = this.gatherFunction.gather(userRichEdge);
+			return new Tuple2<K, M>(key, result);
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+				this.gatherFunction.init(getIterationRuntimeContext());
+			}
+			this.gatherFunction.preSuperstep();
+		}
+
+		@Override
+		public void close() throws Exception {
+			this.gatherFunction.postSuperstep();
+		}
+
+		@Override
+		public TypeInformation<Tuple2<K, M>> getProducedType() {
+			return this.resultType;
+		}
+	}
+
+	private static final class SumUdf<K extends Comparable<K> & Serializable, VV extends Serializable,
+			EV extends Serializable, M> extends RichReduceFunction<Tuple2<K, M>>
+			implements ResultTypeQueryable<Tuple2<K, M>>{
+
+		private final SumFunction<VV, EV, M> sumFunction;
+		private transient TypeInformation<Tuple2<K, M>> resultType;
+
+		private SumUdf(SumFunction<VV, EV, M> sumFunction, TypeInformation<Tuple2<K, M>> resultType) {
+			this.sumFunction = sumFunction;
+			this.resultType = resultType;
+		}
+
+		@Override
+		public Tuple2<K, M> reduce(Tuple2<K, M> arg0, Tuple2<K, M> arg1) throws Exception {
+			K key = arg0.f0;
+			M result = this.sumFunction.sum(arg0.f1, arg1.f1);
+			return new Tuple2<K, M>(key, result);
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+				this.sumFunction.init(getIterationRuntimeContext());
+			}
+			this.sumFunction.preSuperstep();
+		}
+
+		@Override
+		public void close() throws Exception {
+			this.sumFunction.postSuperstep();
+		}
+
+		@Override
+		public TypeInformation<Tuple2<K, M>> getProducedType() {
+			return this.resultType;
+		}
+	}
+
+	private static final class ApplyUdf<K extends Comparable<K> & Serializable,
+			VV extends Serializable, EV extends Serializable, M> extends RichFlatJoinFunction<Tuple2<K, M>,
+			Vertex<K, VV>, Vertex<K, VV>> implements ResultTypeQueryable<Vertex<K, VV>> {
+
+		private final ApplyFunction<VV, EV, M> applyFunction;
+		private transient TypeInformation<Vertex<K, VV>> resultType;
+
+		private ApplyUdf(ApplyFunction<VV, EV, M> applyFunction, TypeInformation<Vertex<K, VV>> resultType) {
+			this.applyFunction = applyFunction;
+			this.resultType = resultType;
+		}
+
+		@Override
+		public void join(Tuple2<K, M> arg0, Vertex<K, VV> arg1, final Collector<Vertex<K, VV>> out) throws Exception {
+
+			final K key = arg1.getId();
+			Collector<VV> userOut = new Collector<VV>() {
+				@Override
+				public void collect(VV record) {
+					out.collect(new Vertex<K, VV>(key, record));
+				}
+
+				@Override
+				public void close() {
+					out.close();
+				}
+			};
+
+			this.applyFunction.setOutput(userOut);
+			this.applyFunction.apply(arg0.f1, arg1.getValue());
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+				this.applyFunction.init(getIterationRuntimeContext());
+			}
+			this.applyFunction.preSuperstep();
+		}
+
+		@Override
+		public void close() throws Exception {
+			this.applyFunction.postSuperstep();
+		}
+
+		@Override
+		public TypeInformation<Vertex<K, VV>> getProducedType() {
+			return this.resultType;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
new file mode 100755
index 0000000..9befccb
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+
+/**
+ * A wrapper around Tuple3<VV, EV, VV> for convenience in the GatherFunction
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ */
+public class RichEdge<VV extends Serializable, EV extends Serializable>
+		extends Tuple2<VV, EV> {
+
+	public RichEdge() {}
+
+	public RichEdge(VV src, EV edge) {
+		super(src, edge);
+	}
+
+	public VV getSrcVertexValue() {
+		return this.f0;
+	}
+
+	public EV getEdgeValue() {
+		return this.f1;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
new file mode 100755
index 0000000..b616194
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+
+import java.io.Serializable;
+
+public abstract class SumFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
+
+	public abstract M sum(M arg0, M arg1);
+
+	/**
+	 * This method is executed once per superstep before the vertex update function is invoked for each vertex.
+	 *
+	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+	 */
+	public void preSuperstep() {};
+
+	/**
+	 * This method is executed once per superstep after the vertex update function has been invoked for each vertex.
+	 *
+	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+	 */
+	public void postSuperstep() {};
+
+	// --------------------------------------------------------------------------------------------
+	//  Internal methods
+	// --------------------------------------------------------------------------------------------
+
+	private IterationRuntimeContext runtimeContext;
+
+	public void init(IterationRuntimeContext iterationRuntimeContext) {
+		this.runtimeContext = iterationRuntimeContext;
+	};
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/722719f2/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
new file mode 100755
index 0000000..321d530
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.GSAGreedyGraphColoringExample;
+import org.apache.flink.graph.example.GSASingleSourceShortestPathsExample;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class GatherSumApplyITCase extends MultipleProgramsTestBase {
+
+	public GatherSumApplyITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+	private String verticesPath;
+	private String edgesPath;
+	private String resultPath;
+	private String expectedResult;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+		File verticesFile = tempFolder.newFile();
+		Files.write(GatherSumApplyITCase.VERTICES, verticesFile, Charsets.UTF_8);
+
+		File edgesFile = tempFolder.newFile();
+		Files.write(GatherSumApplyITCase.EDGES, edgesFile, Charsets.UTF_8);
+
+		verticesPath = verticesFile.toURI().toString();
+		edgesPath = edgesFile.toURI().toString();
+
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Greedy Graph Coloring Test
+	// --------------------------------------------------------------------------------------------
+
+	@Test
+	public void testGreedyGraphColoring() throws Exception {
+		GSAGreedyGraphColoringExample.main(new String[] {verticesPath, edgesPath, resultPath, "16"});
+		expectedResult = "1 1.0\n" +
+				"2 1.0\n" +
+				"3 1.0\n" +
+				"4 1.0\n" +
+				"5 1.0\n";
+
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Single Source Shortest Path Test
+	// --------------------------------------------------------------------------------------------
+
+	@Test
+	public void testSingleSourceShortestPath() throws Exception {
+		GSASingleSourceShortestPathsExample.main(new String[]{verticesPath, edgesPath, resultPath, "1", "16"});
+		expectedResult = "1 0.0\n" +
+				"2 12.0\n" +
+				"3 13.0\n" +
+				"4 47.0\n" +
+				"5 48.0\n";
+	}
+
+
+	// --------------------------------------------------------------------------------------------
+	//  Sample data
+	// --------------------------------------------------------------------------------------------
+
+	private static final String VERTICES = "1 1.0\n" +
+			"2 2.0\n" +
+			"3 3.0\n" +
+			"4 4.0\n" +
+			"5 5.0\n";
+
+	private static final String EDGES = "1 2 12.0\n" +
+			"1 3 13.0\n" +
+			"2 3 23.0\n" +
+			"3 4 34.0\n" +
+			"3 5 35.0\n" +
+			"4 5 45.0\n" +
+			"5 1 51.0\n";
+
+}


[4/9] flink git commit: [FLINK-1514] [gelly] Fixed inconsistencies after merge

Posted by va...@apache.org.
[FLINK-1514] [gelly] Fixed inconsistencies after merge


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/837508df
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/837508df
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/837508df

Branch: refs/heads/master
Commit: 837508df876c77cf22d7b289e76547df6a485bf0
Parents: 740d437
Author: Dániel Bali <ba...@gmail.com>
Authored: Sun Apr 19 21:17:03 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 19:52:24 2015 +0200

----------------------------------------------------------------------
 flink-staging/flink-gelly/pom.xml               | 31 -------------
 .../main/java/org/apache/flink/graph/Graph.java | 12 +----
 .../example/GSAConnectedComponentsExample.java  | 10 ++---
 .../GSASingleSourceShortestPathsExample.java    |  4 +-
 .../apache/flink/graph/gsa/GatherFunction.java  |  2 +-
 .../graph/gsa/GatherSumApplyIteration.java      | 16 +++----
 .../org/apache/flink/graph/gsa/Neighbor.java    | 47 ++++++++++++++++++++
 .../org/apache/flink/graph/gsa/RichEdge.java    | 47 --------------------
 8 files changed, 65 insertions(+), 104 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/pom.xml
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/pom.xml b/flink-staging/flink-gelly/pom.xml
old mode 100644
new mode 100755
index 5245667..a36ab4b
--- a/flink-staging/flink-gelly/pom.xml
+++ b/flink-staging/flink-gelly/pom.xml
@@ -57,35 +57,4 @@ under the License.
 			<version>${guava.version}</version>
 		</dependency>
 	</dependencies>
-
-    <!-- See main pom.xml for explanation of profiles -->
-    <profiles>
-        <profile>
-            <id>hadoop-1</id>
-            <activation>
-                <property>
-                    <!-- Please do not remove the 'hadoop1' comment. See ./tools/generate_specific_pom.sh -->
-                    <!--hadoop1--><name>hadoop.profile</name><value>1</value>
-                </property>
-            </activation>
-            <dependencies>
-                <!-- Add this here, for hadoop-2 we don't need it since we get guava transitively -->
-                <dependency>
-                    <groupId>com.google.guava</groupId>
-                    <artifactId>guava</artifactId>
-                    <version>${guava.version}</version>
-                    <scope>provided</scope>
-                </dependency>
-            </dependencies>
-        </profile>
-        <profile>
-            <id>hadoop-2</id>
-            <activation>
-                <property>
-                    <!-- Please do not remove the 'hadoop2' comment. See ./tools/generate_specific_pom.sh -->
-                    <!--hadoop2--><name>!hadoop.profile</name>
-                </property>
-            </activation>
-        </profile>
-    </profiles>
 </project>

http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index d564f24..330c951 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -83,8 +83,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	private final DataSet<Edge<K, EV>> edges;
 
 	/**
-	 * Creates a graph from two DataSets: vertices and edges and allow setting
-	 * the undirected property
+	 * Creates a graph from two DataSets: vertices and edges
 	 * 
 	 * @param vertices a DataSet of vertices.
 	 * @param edges a DataSet of edges.
@@ -919,7 +918,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	}
 
 	/**
-	 * @return Singleton DataSet containing the edge count
+	 * @return a long integer representing the number of edges
 	 */
 	public long numberOfEdges() throws Exception {
 		return edges.count();
@@ -1016,13 +1015,6 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
-	private static final class CheckIfOneComponentMapper implements	MapFunction<Integer, Boolean> {
-		@Override
-		public Boolean map(Integer n) {
-			return (n == 1);
-		}
-	}
-
 	/**
 	 * Adds the input vertex and edges to the graph. If the vertex already
 	 * exists in the graph, it will not be added again, but the given edges

http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
index d338b03..6a2c250 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -30,7 +30,7 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.RichEdge;
+import org.apache.flink.graph.gsa.Neighbor;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
@@ -70,13 +70,13 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 				graph.runGatherSumApplyIteration(gather, sum, apply, maxIterations);
 
 		// Extract the vertices as the result
-		DataSet<Vertex<Long, Long>> greedyGraphColoring = result.getVertices();
+		DataSet<Vertex<Long, Long>> connectedComponents = result.getVertices();
 
 		// emit result
 		if (fileOutput) {
-			greedyGraphColoring.writeAsCsv(outputPath, "\n", " ");
+			connectedComponents.writeAsCsv(outputPath, "\n", " ");
 		} else {
-			greedyGraphColoring.print();
+			connectedComponents.print();
 		}
 
 		env.execute("GSA Connected Components");
@@ -99,7 +99,7 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 	private static final class ConnectedComponentsGather
 			extends GatherFunction<Long, NullValue, Long> {
 		@Override
-		public Long gather(RichEdge<Long, NullValue> richEdge) {
+		public Long gather(Neighbor<Long, NullValue> richEdge) {
 
 			return richEdge.getSrcVertexValue();
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
index 8967a90..cc3b054 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -31,7 +31,7 @@ import org.apache.flink.graph.example.utils.SingleSourceShortestPathsData;
 import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.RichEdge;
+import org.apache.flink.graph.gsa.Neighbor;
 import org.apache.flink.util.Collector;
 
 /**
@@ -116,7 +116,7 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 	private static final class SingleSourceShortestPathGather
 			extends GatherFunction<Double, Double, Double> {
 		@Override
-		public Double gather(RichEdge<Double, Double> richEdge) {
+		public Double gather(Neighbor<Double, Double> richEdge) {
 			return richEdge.getSrcVertexValue() + richEdge.getEdgeValue();
 		}
 	};

http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
index 91a468d..0d110d5 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
@@ -24,7 +24,7 @@ import java.io.Serializable;
 
 public abstract class GatherFunction<VV extends Serializable, EV extends Serializable, M> implements Serializable {
 
-	public abstract M gather(RichEdge<VV, EV> richEdge);
+	public abstract M gather(Neighbor<VV, EV> neighbor);
 
 	/**
 	 * This method is executed once per superstep before the vertex update function is invoked for each vertex.

http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 1adab29..67ae094 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -117,15 +117,15 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 		final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
 				vertexDataSet.iterateDelta(vertexDataSet, maximumNumberOfIterations, zeroKeyPos);
 
-		// Prepare the rich edges
-		DataSet<Tuple2<Vertex<K, VV>, Edge<K, EV>>> richEdges = iteration
+		// Prepare the neighbors
+		DataSet<Tuple2<Vertex<K, VV>, Edge<K, EV>>> neighbors = iteration
 				.getWorkset()
 				.join(edgeDataSet)
 				.where(0)
 				.equalTo(0);
 
 		// Gather, sum and apply
-		DataSet<Tuple2<K, M>> gatheredSet = richEdges.map(gatherUdf);
+		DataSet<Tuple2<K, M>> gatheredSet = neighbors.map(gatherUdf);
 		DataSet<Tuple2<K, M>> summedSet = gatheredSet.groupBy(0).reduce(sumUdf);
 		DataSet<Vertex<K, VV>> appliedSet = summedSet
 				.join(iteration.getSolutionSet())
@@ -178,12 +178,12 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 		}
 
 		@Override
-		public Tuple2<K, M> map(Tuple2<Vertex<K, VV>, Edge<K, EV>> richEdge) throws Exception {
-			RichEdge<VV, EV> userRichEdge = new RichEdge<VV, EV>(richEdge.f0.getValue(),
-					richEdge.f1.getValue());
+		public Tuple2<K, M> map(Tuple2<Vertex<K, VV>, Edge<K, EV>> neighborTuple) throws Exception {
+			Neighbor<VV, EV> neighbor = new Neighbor<VV, EV>(neighborTuple.f0.getValue(),
+					neighborTuple.f1.getValue());
 
-			K key = richEdge.f1.getTarget();
-			M result = this.gatherFunction.gather(userRichEdge);
+			K key = neighborTuple.f1.getTarget();
+			M result = this.gatherFunction.gather(neighbor);
 			return new Tuple2<K, M>(key, result);
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
new file mode 100755
index 0000000..2260022
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+
+/**
+ * This class represents a <sourceVertex, edge> pair
+ * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ */
+public class Neighbor<VV extends Serializable, EV extends Serializable>
+		extends Tuple2<VV, EV> {
+
+	public Neighbor() {}
+
+	public Neighbor(VV src, EV edge) {
+		super(src, edge);
+	}
+
+	public VV getSrcVertexValue() {
+		return this.f0;
+	}
+
+	public EV getEdgeValue() {
+		return this.f1;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/837508df/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
deleted file mode 100755
index 8d4b4d8..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/RichEdge.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.gsa;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-import java.io.Serializable;
-
-/**
- * This class represents a <sourceVertex, edge> pair
- * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
- * @param <VV> the vertex value type
- * @param <EV> the edge value type
- */
-public class RichEdge<VV extends Serializable, EV extends Serializable>
-		extends Tuple2<VV, EV> {
-
-	public RichEdge() {}
-
-	public RichEdge(VV src, EV edge) {
-		super(src, edge);
-	}
-
-	public VV getSrcVertexValue() {
-		return this.f0;
-	}
-
-	public EV getEdgeValue() {
-		return this.f1;
-	}
-}


[6/9] flink git commit: [FLINK-1514] [gelly] renamed fields in Neighbor class; added forwarded fields hint in GSAIteration

Posted by va...@apache.org.
[FLINK-1514] [gelly] renamed fields in Neighbor class; added forwarded fields hint in GSAIteration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/66d72acd
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/66d72acd
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/66d72acd

Branch: refs/heads/master
Commit: 66d72acd23e3006b23348babc256bb6366290c61
Parents: 4e3165e
Author: vasia <va...@apache.org>
Authored: Tue Apr 21 02:51:04 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sat Apr 25 20:00:59 2015 +0200

----------------------------------------------------------------------
 .../flink/graph/example/GSAConnectedComponentsExample.java     | 2 +-
 .../graph/example/GSASingleSourceShortestPathsExample.java     | 4 ++--
 .../org/apache/flink/graph/gsa/GatherSumApplyIteration.java    | 6 +++++-
 .../src/main/java/org/apache/flink/graph/gsa/Neighbor.java     | 6 +++---
 4 files changed, 11 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/66d72acd/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
index fca7b1d..dba28cd 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSAConnectedComponentsExample.java
@@ -89,7 +89,7 @@ public class GSAConnectedComponentsExample implements ProgramDescription {
 	private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
 
 		public Long gather(Neighbor<Long, NullValue> neighbor) {
-			return neighbor.getSrcVertexValue();
+			return neighbor.getNeighborValue();
 		}
 	};
 

http://git-wip-us.apache.org/repos/asf/flink/blob/66d72acd/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
index b4f1f34..166a3d3 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/GSASingleSourceShortestPathsExample.java
@@ -97,8 +97,8 @@ public class GSASingleSourceShortestPathsExample implements ProgramDescription {
 	@SuppressWarnings("serial")
 	private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
 
-		public Double gather(Neighbor<Double, Double> richEdge) {
-			return richEdge.getSrcVertexValue() + richEdge.getEdgeValue();
+		public Double gather(Neighbor<Double, Double> neighbor) {
+			return neighbor.getNeighborValue() + neighbor.getEdgeValue();
 		}
 	};
 

http://git-wip-us.apache.org/repos/asf/flink/blob/66d72acd/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index 53fa7a4..992f840 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
 import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.JoinOperator;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
@@ -127,12 +128,15 @@ public class GatherSumApplyIteration<K extends Comparable<K> & Serializable,
 		// Gather, sum and apply
 		DataSet<Tuple2<K, M>> gatheredSet = neighbors.map(gatherUdf);
 		DataSet<Tuple2<K, M>> summedSet = gatheredSet.groupBy(0).reduce(sumUdf);
-		DataSet<Vertex<K, VV>> appliedSet = summedSet
+		JoinOperator<?, ?, Vertex<K, VV>> appliedSet = summedSet
 				.join(iteration.getSolutionSet())
 				.where(0)
 				.equalTo(0)
 				.with(applyUdf);
 
+		// let the operator know that we preserve the key field
+		appliedSet.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
+
 		return iteration.closeWith(appliedSet, appliedSet);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/66d72acd/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
index 39f18d5..5a06af9 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
@@ -34,11 +34,11 @@ public class Neighbor<VV extends Serializable, EV extends Serializable>
 
 	public Neighbor() {}
 
-	public Neighbor(VV src, EV edge) {
-		super(src, edge);
+	public Neighbor(VV neighborValue, EV edgeValue) {
+		super(neighborValue, edgeValue);
 	}
 
-	public VV getSrcVertexValue() {
+	public VV getNeighborValue() {
 		return this.f0;
 	}