You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/01 21:55:04 UTC

flink git commit: [FLINK-2663] [gelly] Updated Gelly library methods to use generic key types

Repository: flink
Updated Branches:
  refs/heads/master bbd97354b -> 9f7110748


[FLINK-2663] [gelly] Updated Gelly library methods to use generic key types

This squashes the following commits:

[gelly] Added missing Javadocs to GSA classes

[FLINK-2663] [gelly] Updated Gelly library methods to also use generic vertex/edge values where possible

This closes #1152


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9f711074
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9f711074
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9f711074

Branch: refs/heads/master
Commit: 9f7110748434d2bdea96f16c70f4f0894261ae69
Parents: bbd9735
Author: vasia <va...@apache.org>
Authored: Sun Sep 20 21:20:11 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Thu Oct 1 21:22:35 2015 +0200

----------------------------------------------------------------------
 .../graph/example/ConnectedComponents.java      |   2 +-
 .../flink/graph/example/MusicProfiles.java      |   3 +-
 .../apache/flink/graph/gsa/ApplyFunction.java   |  15 +++
 .../apache/flink/graph/gsa/GatherFunction.java  |  16 +++
 .../org/apache/flink/graph/gsa/SumFunction.java |  18 +++-
 .../flink/graph/library/CommunityDetection.java |  53 ++++++----
 .../graph/library/ConnectedComponents.java      |  32 +++---
 .../graph/library/GSAConnectedComponents.java   |  22 ++--
 .../apache/flink/graph/library/GSAPageRank.java |   8 +-
 .../library/GSASingleSourceShortestPaths.java   |   8 +-
 .../flink/graph/library/GSATriangleCount.java   | 102 +++++++++----------
 .../flink/graph/library/LabelPropagation.java   |  12 ++-
 .../apache/flink/graph/library/PageRank.java    |   8 +-
 .../library/SingleSourceShortestPaths.java      |   8 +-
 .../flink/graph/utils/NullValueEdgeMapper.java  |  32 ++++++
 .../apache/flink/graph/gsa/GSACompilerTest.java |   1 -
 .../flink/graph/gsa/GSATranslationTest.java     |   1 -
 .../flink/graph/test/GatherSumApplyITCase.java  |   8 +-
 .../test/library/CommunityDetectionITCase.java  |   4 +-
 ...ctedComponentsWithRandomisedEdgesITCase.java |   3 +-
 .../test/library/LabelPropagationITCase.java    |   8 +-
 .../graph/test/library/PageRankITCase.java      |   8 +-
 .../graph/test/library/TriangleCountITCase.java |   6 +-
 .../test/operations/GraphOperationsITCase.java  |   1 -
 24 files changed, 242 insertions(+), 137 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
index bd08190..4189602 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/ConnectedComponents.java
@@ -69,7 +69,7 @@ public class ConnectedComponents implements ProgramDescription {
 		}, env);
 
 		DataSet<Vertex<Long, Long>> verticesWithMinIds = graph
-				.run(new GSAConnectedComponents(maxIterations)).getVertices();
+				.run(new GSAConnectedComponents<Long, NullValue>(maxIterations));
 
 		// emit result
 		if (fileOutput) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index a56224d..e347bc5 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -153,8 +153,7 @@ public class MusicProfiles implements ProgramDescription {
 							public Long map(Tuple2<Long, Long> value) {
 								return value.f1;
 							}
-						}).run(new LabelPropagation<String>(maxIterations))
-				.getVertices();
+						}).run(new LabelPropagation<String, NullValue>(maxIterations));
 
 		if (fileOutput) {
 			verticesWithCommunity.writeAsCsv(communitiesOutputPath, "\n", "\t");

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
index ed0cf70..5a8e97a 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/ApplyFunction.java
@@ -27,6 +27,13 @@ import org.apache.flink.util.Collector;
 import java.io.Serializable;
 import java.util.Collection;
 
+/**
+ * The base class for the third and last step of a {@link GatherSumApplyIteration}.
+ *
+ * @param <K> the vertex ID type
+ * @param <VV> the vertex value type
+ * @param <M> the input type (produced by the Sum phase)
+ */
 @SuppressWarnings("serial")
 public abstract class ApplyFunction<K, VV, M> implements Serializable {
 
@@ -51,6 +58,14 @@ public abstract class ApplyFunction<K, VV, M> implements Serializable {
 
 	//---------------------------------------------------------------------------------------------
 
+	/**
+	 * This method is invoked once per superstep, after the {@link SumFunction} 
+	 * in a {@link GatherSumApplyIteration}.
+	 * It updates the Vertex values.
+	 * 
+	 * @param newValue the value computed during the current superstep.
+	 * @param currentValue the current Vertex value.
+	 */
 	public abstract void apply(M newValue, VV currentValue);
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
index 5a09a5a..563b20e 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherFunction.java
@@ -25,6 +25,13 @@ import org.apache.flink.types.Value;
 import java.io.Serializable;
 import java.util.Collection;
 
+/**
+ * The base class for the first step of a {@link GatherSumApplyIteration}.
+ * 
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ * @param <M> the output type 
+ */
 @SuppressWarnings("serial")
 public abstract class GatherFunction<VV, EV, M> implements Serializable {
 
@@ -49,6 +56,15 @@ public abstract class GatherFunction<VV, EV, M> implements Serializable {
 
 	//---------------------------------------------------------------------------------------------
 
+	/**
+	 * This method is invoked once per superstep, for each {@link Neighbor} of each Vertex 
+	 * in the beginning of each superstep in a {@link GatherSumApplyIteration}.
+	 * It needs to produce a partial value, which will be combined with other partial value
+	 * in the next phase of the iteration.
+	 *  
+	 * @param neighbor the input Neighbor. It provides access to the source Vertex and the Edge objects.
+	 * @return a partial result to be combined in the Sum phase.
+	 */
 	public abstract M gather(Neighbor<VV, EV> neighbor);
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
index 69baae4..f27e275 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
@@ -25,6 +25,13 @@ import org.apache.flink.types.Value;
 import java.io.Serializable;
 import java.util.Collection;
 
+/**
+ * The base class for the second step of a {@link GatherSumApplyIteration}.
+ *
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ * @param <M> the output type
+ */
 @SuppressWarnings("serial")
 public abstract class SumFunction<VV, EV, M> implements Serializable {
 
@@ -48,7 +55,16 @@ public abstract class SumFunction<VV, EV, M> implements Serializable {
 	}
 
 	//---------------------------------------------------------------------------------------------
-
+	/**
+	 * This method is invoked once per superstep, after the {@link GatherFunction} 
+	 * in a {@link GatherSumApplyIteration}.
+	 * It combines the partial values produced by {@link GatherFunction#gather(Neighbor)}
+	 * in pairs, until a single value has been computed.
+	 * 
+	 * @param arg0 the first partial value.
+	 * @param arg1 the second partial value.
+	 * @return the combined value.
+	 */
 	public abstract M sum(M arg0, M arg1);
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
index 31488ee..0dd39fc 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
@@ -19,6 +19,8 @@
 package org.apache.flink.graph.library;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
@@ -34,18 +36,21 @@ import java.util.TreeMap;
 /**
  * Community Detection Algorithm.
  *
- * Initially, each vertex is assigned a tuple formed of its own id along with a score equal to 1.0, as value.
+ * This implementation expects Long Vertex values and labels. The Vertex values of the input Graph provide the initial label assignments.
+ * 
+ * Initially, each vertex is assigned a tuple formed of its own initial value along with a score equal to 1.0.
  * The vertices propagate their labels and max scores in iterations, each time adopting the label with the
  * highest score from the list of received messages. The chosen label is afterwards re-scored using the fraction
  * delta/the superstep number. Delta is passed as a parameter and has 0.5 as a default value.
  *
  * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
  * is reached.
+ * 
+ * @param <K> the Vertex ID type 
  *
  * @see <a href="http://arxiv.org/pdf/0808.2633.pdf">article explaining the algorithm in detail</a>
  */
-public class CommunityDetection implements
-	GraphAlgorithm<Long, Long, Double, Graph<Long, Long, Double>> {
+public class CommunityDetection<K> implements GraphAlgorithm<K, Long, Double, Graph<K, Long, Double>> {
 
 	private Integer maxIterations;
 
@@ -58,20 +63,22 @@ public class CommunityDetection implements
 	}
 
 	@Override
-	public Graph<Long, Long, Double> run(Graph<Long, Long, Double> graph) {
+	public Graph<K, Long, Double> run(Graph<K, Long, Double> graph) {
 
-		Graph<Long, Long, Double> undirectedGraph = graph.getUndirected();
+		DataSet<Vertex<K, Tuple2<Long, Double>>> initializedVertices = graph.getVertices()
+				.map(new AddScoreToVertexValuesMapper<K>());
 
-		Graph<Long, Tuple2<Long, Double>, Double> graphWithScoredVertices = undirectedGraph
-				.mapVertices(new AddScoreToVertexValuesMapper());
+		Graph<K, Tuple2<Long, Double>, Double> graphWithScoredVertices =
+				Graph.fromDataSet(initializedVertices, graph.getEdges(), graph.getContext()).getUndirected();
 
-		return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater(delta),
-				new LabelMessenger(), maxIterations)
-				.mapVertices(new RemoveScoreFromVertexValuesMapper());
+		return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater<K>(delta),
+				new LabelMessenger<K>(), maxIterations)
+				.mapVertices(new RemoveScoreFromVertexValuesMapper<K>());
 	}
 
 	@SuppressWarnings("serial")
-	public static final class VertexLabelUpdater extends VertexUpdateFunction<Long, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+	public static final class VertexLabelUpdater<K> extends VertexUpdateFunction<
+		K, Tuple2<Long, Double>, Tuple2<Long, Double>> {
 
 		private Double delta;
 
@@ -80,7 +87,7 @@ public class CommunityDetection implements
 		}
 
 		@Override
-		public void updateVertex(Vertex<Long, Tuple2<Long, Double>> vertex,
+		public void updateVertex(Vertex<K, Tuple2<Long, Double>> vertex,
 								MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception {
 
 			// we would like these two maps to be ordered
@@ -140,34 +147,36 @@ public class CommunityDetection implements
 	}
 
 	@SuppressWarnings("serial")
-	public static final class LabelMessenger extends MessagingFunction<Long, Tuple2<Long, Double>,
+	public static final class LabelMessenger<K> extends MessagingFunction<K, Tuple2<Long, Double>,
 			Tuple2<Long, Double>, Double> {
 
 		@Override
-		public void sendMessages(Vertex<Long, Tuple2<Long, Double>> vertex) throws Exception {
+		public void sendMessages(Vertex<K, Tuple2<Long, Double>> vertex) throws Exception {
 
-			for(Edge<Long, Double> edge : getEdges()) {
+			for(Edge<K, Double> edge : getEdges()) {
 				sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertex.getValue().f0,
 						vertex.getValue().f1 * edge.getValue()));
 			}
-
 		}
 	}
 
 	@SuppressWarnings("serial")
-	public static final class AddScoreToVertexValuesMapper implements MapFunction<Vertex<Long, Long>, Tuple2<Long, Double>> {
+	@ForwardedFields("f0")
+	public static final class AddScoreToVertexValuesMapper<K> implements MapFunction<
+		Vertex<K, Long>, Vertex<K, Tuple2<Long, Double>>> {
 
-		@Override
-		public Tuple2<Long, Double> map(Vertex<Long, Long> vertex) throws Exception {
-			return new Tuple2<Long, Double>(vertex.getValue(), 1.0);
+		public Vertex<K, Tuple2<Long, Double>> map(Vertex<K, Long> vertex) {
+			return new Vertex<K, Tuple2<Long, Double>>(
+					vertex.getId(), new Tuple2<Long, Double>(vertex.getValue(), 1.0));
 		}
 	}
 
 	@SuppressWarnings("serial")
-	public static final class RemoveScoreFromVertexValuesMapper implements MapFunction<Vertex<Long, Tuple2<Long, Double>>, Long> {
+	public static final class RemoveScoreFromVertexValuesMapper<K> implements MapFunction<
+		Vertex<K, Tuple2<Long, Double>>, Long> {
 
 		@Override
-		public Long map(Vertex<Long, Tuple2<Long, Double>> vertex) throws Exception {
+		public Long map(Vertex<K, Tuple2<Long, Double>> vertex) throws Exception {
 			return vertex.getValue().f0;
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
index 871f315..ed853fe 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
@@ -18,27 +18,32 @@
 
 package org.apache.flink.graph.library;
 
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
 import org.apache.flink.types.NullValue;
 
 /**
- * A vertex-centric implementation of the Connected components algorithm.
+ * A vertex-centric implementation of the Connected Components algorithm.
  *
- * Initially, each vertex will have its own ID as a value(is its own component). The vertices propagate their
- * current component ID in iterations, each time adopting a new value from the received neighbor IDs,
+ * This implementation assumes that the vertices of the input Graph are initialized with unique, Long component IDs.
+ * The vertices propagate their current component ID in iterations, each time adopting a new value from the received neighbor IDs,
  * provided that the value is less than the current minimum.
  *
  * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
  * is reached.
+ * 
+ * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID.
+ * 
+ * @see {@link org.apache.flink.graph.library.GSAConnectedComponents}
  */
 @SuppressWarnings("serial")
-public class ConnectedComponents implements
-	GraphAlgorithm<Long, Long, NullValue, Graph<Long, Long, NullValue>> {
+public class ConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> {
 
 	private Integer maxIterations;
 
@@ -47,21 +52,24 @@ public class ConnectedComponents implements
 	}
 
 	@Override
-	public Graph<Long, Long, NullValue> run(Graph<Long, Long, NullValue> graph) throws Exception {
+	public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> graph) throws Exception {
 
-		Graph<Long, Long, NullValue> undirectedGraph = graph.getUndirected();
+		Graph<K, Long, NullValue> undirectedGraph = graph.mapEdges(new NullValueEdgeMapper<K, EV>())
+				.getUndirected();
 
 		// initialize vertex values and run the Vertex Centric Iteration
-		return undirectedGraph.runVertexCentricIteration(new CCUpdater(), new CCMessenger(), maxIterations);
+		return undirectedGraph.runVertexCentricIteration(
+				new CCUpdater<K>(), new CCMessenger<K>(), maxIterations)
+				.getVertices();
 	}
 
 	/**
 	 * Updates the value of a vertex by picking the minimum neighbor ID out of all the incoming messages.
 	 */
-	public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> {
+	public static final class CCUpdater<K> extends VertexUpdateFunction<K, Long, Long> {
 
 		@Override
-		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> messages) throws Exception {
+		public void updateVertex(Vertex<K, Long> vertex, MessageIterator<Long> messages) throws Exception {
 			long min = Long.MAX_VALUE;
 
 			for (long msg : messages) {
@@ -78,10 +86,10 @@ public class ConnectedComponents implements
 	/**
 	 * Distributes the minimum ID associated with a given vertex among all the target vertices.
 	 */
-	public static final class CCMessenger extends MessagingFunction<Long, Long, Long, NullValue> {
+	public static final class CCMessenger<K> extends MessagingFunction<K, Long, Long, NullValue> {
 
 		@Override
-		public void sendMessages(Vertex<Long, Long> vertex) throws Exception {
+		public void sendMessages(Vertex<K, Long> vertex) throws Exception {
 			// send current minimum to neighbors
 			sendMessageToAllNeighbors(vertex.getValue());
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
index f852f3e..77bc2cf 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
@@ -18,19 +18,25 @@
 
 package org.apache.flink.graph.library;
 
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.SumFunction;
 import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
 import org.apache.flink.types.NullValue;
 
 /**
  * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration.
+ * This implementation assumes that the vertices of the input Graph are initialized with unique, Long component IDs.
+ * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID.
+ * 
+ * @see {@link org.apache.flink.graph.library.ConnectedComponents}
  */
-public class GSAConnectedComponents implements
-	GraphAlgorithm<Long, Long, NullValue, Graph<Long, Long, NullValue>> {
+public class GSAConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> {
 
 	private Integer maxIterations;
 
@@ -39,13 +45,15 @@ public class GSAConnectedComponents implements
 	}
 
 	@Override
-	public Graph<Long, Long, NullValue> run(Graph<Long, Long, NullValue> graph) throws Exception {
+	public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> graph) throws Exception {
 
-		Graph<Long, Long, NullValue> undirectedGraph = graph.getUndirected();
+		Graph<K, Long, NullValue> undirectedGraph = graph.mapEdges(new NullValueEdgeMapper<K, EV>())
+				.getUndirected();
 
 		// initialize vertex values and run the Vertex Centric Iteration
-		return undirectedGraph.runGatherSumApplyIteration(new GatherNeighborIds(), new SelectMinId(), new UpdateComponentId(),
-				maxIterations);
+		return undirectedGraph.runGatherSumApplyIteration(
+				new GatherNeighborIds(), new SelectMinId(), new UpdateComponentId<K>(),
+				maxIterations).getVertices();
 	}
 
 	// --------------------------------------------------------------------------------------------
@@ -69,7 +77,7 @@ public class GSAConnectedComponents implements
 	};
 
 	@SuppressWarnings("serial")
-	private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
+	private static final class UpdateComponentId<K> extends ApplyFunction<K, Long, Long> {
 
 		public void apply(Long summedValue, Long origValue) {
 			if (summedValue < origValue) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
index 6ce2ed6..df3e89a 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.gsa.ApplyFunction;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.Neighbor;
@@ -36,7 +37,7 @@ import org.apache.flink.graph.gsa.SumFunction;
  * 
  * The implementation assumes that each page has at least one incoming and one outgoing link.
  */
-public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> {
+public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
 
 	private double beta;
 	private int maxIterations;
@@ -58,7 +59,7 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, Graph<K
 	}
 
 	@Override
-	public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
+	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
 
 		if (numberOfVertices == 0) {
 			numberOfVertices = network.numberOfVertices();
@@ -70,7 +71,8 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, Graph<K
 				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
 
 		return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
-				new UpdateRanks<K>(beta, numberOfVertices), maxIterations);
+				new UpdateRanks<K>(beta, numberOfVertices), maxIterations)
+				.getVertices();
 	}
 
 	// --------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
index 18bdd1d..5a76072 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.library;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
@@ -31,7 +32,7 @@ import org.apache.flink.graph.gsa.Neighbor;
  * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
  */
 public class GSASingleSourceShortestPaths<K> implements
-	GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> {
+	GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
 
 	private final K srcVertexId;
 	private final Integer maxIterations;
@@ -42,11 +43,12 @@ public class GSASingleSourceShortestPaths<K> implements
 	}
 
 	@Override
-	public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
+	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
 
 		return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
 				.runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
-						new UpdateDistance<K>(), maxIterations);
+						new UpdateDistance<K>(), maxIterations)
+						.getVertices();
 	}
 
 	@SuppressWarnings("serial")

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
index 3d4d902..76d170d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
@@ -23,7 +23,6 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.ReduceNeighborsFunction;
@@ -46,62 +45,61 @@ import java.util.TreeMap;
  *
  * This implementation is non - iterative.
  *
- * The algorithm takes an undirected, unweighted graph as input and outputs a DataSet of
- * Tuple1 which contains a single integer representing the number of triangles.
+ * The algorithm takes an undirected, unweighted graph as input and outputs a DataSet
+ * which contains a single integer representing the number of triangles.
  */
-public class GSATriangleCount implements
-		GraphAlgorithm<Long, NullValue, NullValue, DataSet<Tuple1<Integer>>> {
+public class GSATriangleCount<K extends Comparable<K>, VV, EV> implements
+		GraphAlgorithm<K, VV, EV, DataSet<Integer>> {
 
 	@SuppressWarnings("serial")
 	@Override
-	public DataSet<Tuple1<Integer>> run(Graph<Long, NullValue, NullValue> input) throws Exception {
+	public DataSet<Integer> run(Graph<K, VV, EV> input) throws Exception {
 
 		ExecutionEnvironment env = input.getContext();
 
 		// order the edges so that src is always higher than trg
-		DataSet<Edge<Long, NullValue>> edges = input.getEdges()
-				.map(new OrderEdges()).distinct();
+		DataSet<Edge<K, NullValue>> edges = input.getEdges().map(new OrderEdges<K, EV>()).distinct();
 
-		Graph<Long, TreeMap<Long, Integer>, NullValue> graph = Graph.fromDataSet(edges,
-				new VertexInitializer(), env);
+		Graph<K, TreeMap<K, Integer>, NullValue> graph = Graph.fromDataSet(edges,
+				new VertexInitializer<K>(), env);
 
 		// select neighbors with ids higher than the current vertex id
 		// Gather: a no-op in this case
 		// Sum: create the set of neighbors
-		DataSet<Tuple2<Long, TreeMap<Long, Integer>>> higherIdNeighbors =
-				graph.reduceOnNeighbors(new GatherHigherIdNeighbors(), EdgeDirection.IN);
+		DataSet<Tuple2<K, TreeMap<K, Integer>>> higherIdNeighbors =
+				graph.reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
 
-		Graph<Long, TreeMap<Long, Integer>, NullValue> graphWithReinitializedVertexValues =
-				graph.mapVertices(new VertexInitializerEmptyTreeMap());
+		Graph<K, TreeMap<K, Integer>, NullValue> graphWithReinitializedVertexValues =
+				graph.mapVertices(new VertexInitializerEmptyTreeMap<K>());
 
 		// Apply: attach the computed values to the vertices
 		// joinWithVertices to update the node values
-		DataSet<Vertex<Long, TreeMap<Long, Integer>>> verticesWithHigherIdNeighbors =
-				graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new AttachValues()).getVertices();
+		DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithHigherIdNeighbors =
+				graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new AttachValues<K>()).getVertices();
 
-		Graph<Long, TreeMap<Long,Integer>, NullValue> graphWithNeighbors = Graph.fromDataSet(verticesWithHigherIdNeighbors,
+		Graph<K, TreeMap<K,Integer>, NullValue> graphWithNeighbors = Graph.fromDataSet(verticesWithHigherIdNeighbors,
 				edges, env);
 
 		// propagate each received value to neighbors with higher id
 		// Gather: a no-op in this case
 		// Sum: propagate values
-		DataSet<Tuple2<Long, TreeMap<Long, Integer>>> propagatedValues = graphWithNeighbors
-				.reduceOnNeighbors(new GatherHigherIdNeighbors(), EdgeDirection.IN);
+		DataSet<Tuple2<K, TreeMap<K, Integer>>> propagatedValues = graphWithNeighbors
+				.reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
 
 		// Apply: attach propagated values to vertices
-		DataSet<Vertex<Long, TreeMap<Long, Integer>>> verticesWithPropagatedValues =
-				graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new AttachValues()).getVertices();
+		DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithPropagatedValues =
+				graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new AttachValues<K>()).getVertices();
 
-		Graph<Long, TreeMap<Long, Integer>, NullValue> graphWithPropagatedNeighbors =
+		Graph<K, TreeMap<K, Integer>, NullValue> graphWithPropagatedNeighbors =
 				Graph.fromDataSet(verticesWithPropagatedValues, graphWithNeighbors.getEdges(), env);
 
 		// Scatter: compute the number of triangles
-		DataSet<Tuple1<Integer>> numberOfTriangles = graphWithPropagatedNeighbors.getTriplets()
-				.map(new ComputeTriangles()).reduce(new ReduceFunction<Tuple1<Integer>>() {
+		DataSet<Integer> numberOfTriangles = graphWithPropagatedNeighbors.getTriplets()
+				.map(new ComputeTriangles<K>()).reduce(new ReduceFunction<Integer>() {
 
 					@Override
-					public Tuple1<Integer> reduce(Tuple1<Integer> firstTuple, Tuple1<Integer> secondTuple) throws Exception {
-						return new Tuple1<Integer>(firstTuple.f0 + secondTuple.f0);
+					public Integer reduce(Integer first, Integer second) throws Exception {
+						return first + second;
 					}
 				});
 
@@ -109,24 +107,25 @@ public class GSATriangleCount implements
 	}
 
 	@SuppressWarnings("serial")
-	private static final class OrderEdges implements MapFunction<Edge<Long, NullValue>, Edge<Long, NullValue>> {
+	private static final class OrderEdges<K extends Comparable<K>, EV> implements
+		MapFunction<Edge<K, EV>, Edge<K, NullValue>> {
 
 		@Override
-		public Edge<Long, NullValue> map(Edge<Long, NullValue> edge) throws Exception {
-			if (edge.getSource() < edge.getTarget()) {
-				return new Edge<Long, NullValue>(edge.getTarget(), edge.getSource(), NullValue.getInstance());
+		public Edge<K, NullValue> map(Edge<K, EV> edge) throws Exception {
+			if (edge.getSource().compareTo(edge.getTarget()) < 0) {
+				return new Edge<K, NullValue>(edge.getTarget(), edge.getSource(), NullValue.getInstance());
 			} else {
-				return edge;
+				return new Edge<K, NullValue>(edge.getSource(), edge.getTarget(), NullValue.getInstance());
 			}
 		}
 	}
 
 	@SuppressWarnings("serial")
-	private static final class VertexInitializer implements MapFunction<Long, TreeMap<Long, Integer>> {
+	private static final class VertexInitializer<K> implements MapFunction<K, TreeMap<K, Integer>> {
 
 		@Override
-		public TreeMap<Long, Integer> map(Long value) throws Exception {
-			TreeMap<Long, Integer> neighbors = new TreeMap<Long, Integer>();
+		public TreeMap<K, Integer> map(K value) throws Exception {
+			TreeMap<K, Integer> neighbors = new TreeMap<K, Integer>();
 			neighbors.put(value, 1);
 
 			return neighbors;
@@ -134,31 +133,32 @@ public class GSATriangleCount implements
 	}
 
 	@SuppressWarnings("serial")
-	private static final class VertexInitializerEmptyTreeMap implements
-			MapFunction<Vertex<Long, TreeMap<Long, Integer>>, TreeMap<Long, Integer>> {
+	private static final class VertexInitializerEmptyTreeMap<K> implements
+			MapFunction<Vertex<K, TreeMap<K, Integer>>, TreeMap<K, Integer>> {
 
 		@Override
-		public TreeMap<Long, Integer> map(Vertex<Long, TreeMap<Long, Integer>> vertex) throws Exception {
-			return new TreeMap<Long, Integer>();
+		public TreeMap<K, Integer> map(Vertex<K, TreeMap<K, Integer>> vertex) throws Exception {
+			return new TreeMap<K, Integer>();
 		}
 	}
 
 	@SuppressWarnings("serial")
-	private static final class AttachValues implements MapFunction<Tuple2<TreeMap<Long, Integer>,
-			TreeMap<Long, Integer>>, TreeMap<Long, Integer>> {
+	private static final class AttachValues<K> implements MapFunction<Tuple2<TreeMap<K, Integer>,
+			TreeMap<K, Integer>>, TreeMap<K, Integer>> {
 
 		@Override
-		public TreeMap<Long, Integer> map(Tuple2<TreeMap<Long, Integer>, TreeMap<Long, Integer>> tuple2) throws Exception {
+		public TreeMap<K, Integer> map(Tuple2<TreeMap<K, Integer>, TreeMap<K, Integer>> tuple2) throws Exception {
 			return tuple2.f1;
 		}
 	}
 
 	@SuppressWarnings("serial")
-	private static final class GatherHigherIdNeighbors implements ReduceNeighborsFunction<TreeMap<Long,Integer>> {
+	private static final class GatherHigherIdNeighbors<K> implements
+		ReduceNeighborsFunction<TreeMap<K,Integer>> {
 
 		@Override
-		public TreeMap<Long,Integer> reduceNeighbors(TreeMap<Long,Integer> first, TreeMap<Long,Integer> second) {
-			for (Long key : second.keySet()) {
+		public TreeMap<K, Integer> reduceNeighbors(TreeMap<K,Integer> first, TreeMap<K,Integer> second) {
+			for (K key : second.keySet()) {
 				Integer value = first.get(key);
 				if (value != null) {
 					first.put(key, value + second.get(key));
@@ -171,20 +171,20 @@ public class GSATriangleCount implements
 	}
 
 	@SuppressWarnings("serial")
-	private static final class ComputeTriangles implements MapFunction<Triplet<Long, TreeMap<Long, Integer>, NullValue>,
-			Tuple1<Integer>> {
+	private static final class ComputeTriangles<K> implements MapFunction<Triplet<K, TreeMap<K, Integer>, NullValue>,
+			Integer> {
 
 		@Override
-		public Tuple1<Integer> map(Triplet<Long, TreeMap<Long, Integer>, NullValue> triplet) throws Exception {
+		public Integer map(Triplet<K, TreeMap<K, Integer>, NullValue> triplet) throws Exception {
 
-			Vertex<Long, TreeMap<Long, Integer>> srcVertex = triplet.getSrcVertex();
-			Vertex<Long, TreeMap<Long, Integer>> trgVertex = triplet.getTrgVertex();
+			Vertex<K, TreeMap<K, Integer>> srcVertex = triplet.getSrcVertex();
+			Vertex<K, TreeMap<K, Integer>> trgVertex = triplet.getTrgVertex();
 			int triangles = 0;
 
 			if(trgVertex.getValue().get(srcVertex.getId()) != null) {
-				triangles=trgVertex.getValue().get(srcVertex.getId());
+				triangles = trgVertex.getValue().get(srcVertex.getId());
 			}
-			return new Tuple1<Integer>(triangles);
+			return triangles;
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index 2dd2180..82dfee7 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -18,12 +18,14 @@
 
 package org.apache.flink.graph.library;
 
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
 import org.apache.flink.types.NullValue;
 
 import java.util.HashMap;
@@ -42,8 +44,8 @@ import java.util.Map.Entry;
  */
 @SuppressWarnings("serial")
 
-public class LabelPropagation<K extends Comparable<K>> implements
-	GraphAlgorithm<K, Long, NullValue, Graph<K, Long, NullValue>> {
+public class LabelPropagation<K extends Comparable<K>, EV> implements GraphAlgorithm<K, Long, EV,
+	DataSet<Vertex<K, Long>>> {
 
 	private final int maxIterations;
 
@@ -52,12 +54,12 @@ public class LabelPropagation<K extends Comparable<K>> implements
 	}
 
 	@Override
-	public Graph<K, Long, NullValue> run(Graph<K, Long, NullValue> input) {
+	public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> input) {
 
 		// iteratively adopt the most frequent label among the neighbors
 		// of each vertex
-		return input.runVertexCentricIteration(new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(),
-				maxIterations);
+		return input.mapEdges(new NullValueEdgeMapper<K, EV>()).runVertexCentricIteration(new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(),
+				maxIterations).getVertices();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index 47f7acd..8193dba 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -37,8 +37,7 @@ import org.apache.flink.graph.spargel.VertexUpdateFunction;
  * 
  * The implementation assumes that each page has at least one incoming and one outgoing link.
  */
-public class PageRank<K> implements
-	GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> {
+public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
 
 	private double beta;
 	private int maxIterations;
@@ -66,7 +65,7 @@ public class PageRank<K> implements
 	}
 
 	@Override
-	public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
+	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
 
 		if (numberOfVertices == 0) {
 			numberOfVertices = network.numberOfVertices();
@@ -78,7 +77,8 @@ public class PageRank<K> implements
 				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
 
 		return networkWithWeights.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
-				new RankMessenger<K>(numberOfVertices), maxIterations);
+				new RankMessenger<K>(numberOfVertices), maxIterations)
+				.getVertices();
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 1911f73..60c4c17 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph.library;
 
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
@@ -31,8 +32,7 @@ import org.apache.flink.graph.spargel.VertexUpdateFunction;
  * This is an implementation of the Single-Source-Shortest Paths algorithm, using a vertex-centric iteration.
  */
 @SuppressWarnings("serial")
-public class SingleSourceShortestPaths<K> implements
-	GraphAlgorithm<K, Double, Double, Graph<K, Double, Double>> {
+public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
 
 	private final K srcVertexId;
 	private final Integer maxIterations;
@@ -43,11 +43,11 @@ public class SingleSourceShortestPaths<K> implements
 	}
 
 	@Override
-	public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
+	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
 
 		return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
 				.runVertexCentricIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
-				maxIterations);
+				maxIterations).getVertices();
 	}
 
 	public static final class InitVerticesMapper<K>	implements MapFunction<Vertex<K, Double>, Double> {

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
new file mode 100644
index 0000000..2bd4719
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.types.NullValue;
+
+public class NullValueEdgeMapper<K, EV> implements	MapFunction<Edge<K, EV>, NullValue> {
+
+	private static final long serialVersionUID = 1L;
+
+	public NullValue map(Edge<K, EV> edge) {
+		return NullValue.getInstance();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
index 7a66639..2ad203f 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
@@ -56,7 +56,6 @@ public class GSACompilerTest extends CompilerTestBase {
 			env.setParallelism(DEFAULT_PARALLELISM);
 			// compose test program
 			{
-				@SuppressWarnings("unchecked")
 				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<Long, Long, NullValue>(
 						1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
index 0a7b1c7..ced7508 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
@@ -71,7 +71,6 @@ public class GSATranslationTest {
 			// ------------ construct the test program ------------------
 			{
 
-				@SuppressWarnings("unchecked")
 				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<Long, Long, NullValue>(
 						1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
index 4b381b6..0213f02 100755
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyITCase.java
@@ -55,8 +55,8 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 				ConnectedComponentsDefaultData.getDefaultEdgeDataSet(env),
 				new InitMapperCC(), env);
 
-        List<Vertex<Long, Long>> result = inputGraph.run(new GSAConnectedComponents(16))
-        		.getVertices().collect();
+        List<Vertex<Long, Long>> result = inputGraph.run(
+        		new GSAConnectedComponents<Long, NullValue>(16)).collect();
 
 		expectedResult = "1,1\n" +
 				"2,1\n" +
@@ -78,8 +78,8 @@ public class GatherSumApplyITCase extends MultipleProgramsTestBase {
 				SingleSourceShortestPathsData.getDefaultEdgeDataSet(env),
 				new InitMapperSSSP(), env);
 
-        List<Vertex<Long, Double>> result = inputGraph.run(new GSASingleSourceShortestPaths<Long>(1l, 16))
-        		.getVertices().collect();
+        List<Vertex<Long, Double>> result = inputGraph.run(
+        		new GSASingleSourceShortestPaths<Long>(1l, 16)).collect();
 
 		expectedResult = "1,0.0\n" +
 				"2,12.0\n" +

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
index 104996e..421eaa9 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/CommunityDetectionITCase.java
@@ -50,7 +50,7 @@ public class CommunityDetectionITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
 				CommunityDetectionData.getSimpleEdgeDataSet(env), new InitLabels(), env);
 
-        List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection(1, CommunityDetectionData.DELTA))
+        List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection<Long>(1, CommunityDetectionData.DELTA))
         		.getVertices().collect();
 
 		expected = CommunityDetectionData.COMMUNITIES_SINGLE_ITERATION;
@@ -66,7 +66,7 @@ public class CommunityDetectionITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Double> inputGraph = Graph.fromDataSet(
 				CommunityDetectionData.getTieEdgeDataSet(env), new InitLabels(), env);
 
-        List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection(1, CommunityDetectionData.DELTA))
+        List<Vertex<Long, Long>> result = inputGraph.run(new CommunityDetection<Long>(1, CommunityDetectionData.DELTA))
         		.getVertices().collect();
 		expected = CommunityDetectionData.COMMUNITIES_WITH_TIE;
 		compareResultAsTuples(result, expected);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
index ef4b467..9eb7387 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/ConnectedComponentsWithRandomisedEdgesITCase.java
@@ -60,8 +60,7 @@ public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTes
 
 		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
 
-		DataSet<Vertex<Long, Long>> result = graph
-				.run(new ConnectedComponents(100)).getVertices();
+		DataSet<Vertex<Long, Long>> result = graph.run(new ConnectedComponents<Long, NullValue>(100));
 
 		result.writeAsCsv(resultPath, "\n", " ");
 		env.execute();

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
index da36ef6..8785b0d 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/LabelPropagationITCase.java
@@ -51,8 +51,8 @@ public class LabelPropagationITCase extends MultipleProgramsTestBase {
 				LabelPropagationData.getDefaultVertexSet(env),
 				LabelPropagationData.getDefaultEdgeDataSet(env), env);
 
-        List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long>(1))
-        		.getVertices().collect();
+        List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long, NullValue>(1))
+        		.collect();
 
 		expectedResult = LabelPropagationData.LABELS_AFTER_1_ITERATION;
 		compareResultAsTuples(result, expectedResult);
@@ -69,8 +69,8 @@ public class LabelPropagationITCase extends MultipleProgramsTestBase {
 				LabelPropagationData.getTieVertexSet(env),
 				LabelPropagationData.getTieEdgeDataSet(env), env);
 
-        List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long>(1))
-        		.getVertices().collect();
+        List<Vertex<Long, Long>> result = inputGraph.run(new LabelPropagation<Long, NullValue>(1))
+        		.collect();
 
 		expectedResult = LabelPropagationData.LABELS_WITH_TIE;
 		compareResultAsTuples(result, expectedResult);

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
index cc0327f..94c7713 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/PageRankITCase.java
@@ -51,7 +51,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
 				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
 
         List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3))
-        		.getVertices().collect();
+        		.collect();
         
         compareWithDelta(result, expectedResult, 0.01);
 	}
@@ -64,7 +64,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
 				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
 
         List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3))
-        		.getVertices().collect();
+        		.collect();
         
         compareWithDelta(result, expectedResult, 0.01);
 	}
@@ -77,7 +77,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
 				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
 
         List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 5, 3))
-        		.getVertices().collect();
+        		.collect();
         
         compareWithDelta(result, expectedResult, 0.01);
 	}
@@ -90,7 +90,7 @@ public class PageRankITCase extends MultipleProgramsTestBase {
 				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
 
         List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 5, 3))
-        		.getVertices().collect();
+        		.collect();
         
         compareWithDelta(result, expectedResult, 0.01);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
index 047bbf7..1d9ab9f 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/library/TriangleCountITCase.java
@@ -19,12 +19,12 @@
 package org.apache.flink.graph.test.library;
 
 import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple1;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.example.utils.TriangleCountData;
 import org.apache.flink.graph.library.GSATriangleCount;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.types.NullValue;
+import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
@@ -48,9 +48,9 @@ public class TriangleCountITCase extends MultipleProgramsTestBase {
 		Graph<Long, NullValue, NullValue> graph = Graph.fromDataSet(TriangleCountData.getDefaultEdgeDataSet(env),
 				env).getUndirected();
 
-		List<Tuple1<Integer>> numberOfTriangles = graph.run(new GSATriangleCount()).collect();
+		List<Integer> numberOfTriangles = graph.run(new GSATriangleCount<Long, NullValue, NullValue>()).collect();
 		expectedResult = TriangleCountData.RESULTED_NUMBER_OF_TRIANGLES;
 
-		compareResultAsTuples(numberOfTriangles, expectedResult);
+		Assert.assertEquals(numberOfTriangles.get(0).intValue(), Integer.parseInt(expectedResult));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9f711074/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
index a06f881..ffc9da9 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/GraphOperationsITCase.java
@@ -326,7 +326,6 @@ public class GraphOperationsITCase extends MultipleProgramsTestBase {
 		compareResultAsTuples(result, expectedResult);
 	}
 
-	@SuppressWarnings("unchecked")
 	@Test
 	public void testDifference2() throws Exception {
 		/*