You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/05/19 23:03:31 UTC

[02/10] flink git commit: [FLINK-1523] [gelly] Vertex centric iteration extensions

[FLINK-1523] [gelly] Vertex centric iteration extensions

Removed trailing splits into vertex key and value

Made the extensions optional and separate

Removed overhead when the options are not used


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/585d27d0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/585d27d0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/585d27d0

Branch: refs/heads/master
Commit: 585d27d0d7c882797f813a25d21485187b546145
Parents: 8f4039d
Author: andralungu <lu...@gmail.com>
Authored: Mon Mar 16 14:04:23 2015 +0100
Committer: vasia <va...@apache.org>
Committed: Tue May 19 22:38:03 2015 +0200

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        |  62 +++
 .../main/java/org/apache/flink/graph/Graph.java |   4 +-
 .../flink/graph/IterationConfiguration.java     |   1 -
 .../java/org/apache/flink/graph/Vertex.java     |  26 +-
 .../flink/graph/example/CommunityDetection.java |   4 +-
 .../graph/example/IncrementalSSSPExample.java   | 304 +++++++++++
 .../example/utils/IncrementalSSSPData.java      |  91 ++++
 .../library/CommunityDetectionAlgorithm.java    |  13 +-
 .../library/LabelPropagationAlgorithm.java      |   9 +-
 .../flink/graph/library/PageRankAlgorithm.java  |  13 +-
 .../SingleSourceShortestPathsAlgorithm.java     |  10 +-
 .../flink/graph/spargel/MessagingFunction.java  |  75 ++-
 .../spargel/VertexCentricConfiguration.java     |  38 ++
 .../graph/spargel/VertexCentricIteration.java   | 528 +++++++++++++++----
 .../graph/spargel/VertexUpdateFunction.java     |  89 +++-
 .../test/CollectionModeSuperstepITCase.java     |  13 +-
 .../test/VertexCentricConfigurationITCase.java  |  16 +-
 ...ctedComponentsWithRandomisedEdgesITCase.java |   1 -
 .../test/example/IncrementalSSSPITCase.java     |  84 +++
 19 files changed, 1211 insertions(+), 170 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index abd489c..9089e09 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -438,6 +438,68 @@ public static final class Messenger extends MessagingFunction {...}
 
 {% endhighlight %}
 
+### Vertex-Centric Iteration Extensions
+A vertex-centric iteration can be extended with information such as the total number of vertices,
+the in degree and out degree. Additionally, the  neighborhood type (in/out/all) over which to
+run the vertex-centric iteration can be specified. By default, the updates from the in-neighbors are used
+to modify the current vertex's state and messages are sent to out-neighbors.
+
+In order to activate these options, the following parameters must be set to true:
+
+<strong>Number of Vertices</strong>: Accessing the total number of vertices within the iteration. This property
+can be set using the `setOptNumVertices()` method.
+
+The number of vertices can then be accessed in the vertex update function and in the messaging function
+using the `getNumberOfVertices()` method.
+
+<strong>Degrees</strong>: Accessing the in/out degree for a vertex within an iteration. This property can be set
+using the `setOptDegrees()` method.
+
+The in/out degrees can then be accessed in the vertex update function and in the messaging function, per vertex
+using `vertex.getInDegree()` or `vertex.getOutDegree()`.
+
+<strong>Messaging Direction</strong>: The direction in which messages are sent. This can be either EdgeDirection.IN,
+EdgeDirection.OUT, EdgeDirection.ALL. The messaging direction also dictates the update direction which would be
+EdgeDirection.OUT, EdgeDirection.IN and EdgeDirection.ALL, respectively. This property can be set using the
+`setDirection()` method.
+
+{% highlight java %}
+Graph<Long, Double, Double> graph = ...
+
+// create the vertex-centric iteration
+VertexCentricIteration<Long, Double, Double, Double> iteration =
+			graph.createVertexCentricIteration(
+			new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations);
+
+// set the messaging direction
+iteration.setDirection(EdgeDirection.IN);
+
+// set the number of vertices option to true
+iteration.setOptNumVertices(true);
+
+// set the degree option to true
+iteration.setOptDegrees(true);
+
+// run the computation
+graph.runVertexCentricIteration(iteration);
+
+// user-defined functions
+public static final class VertexDistanceUpdater {
+	...
+	// get the number of vertices
+	long numVertices = getNumberOfVertices();
+	...
+}
+
+public static final class MinDistanceMessenger {
+	...
+	// decrement the number of out-degrees
+	outDegree = vertex.getOutDegree() - 1;
+	...
+}
+
+{% endhighlight %}
+
 [Back to top](#top)
 
 

http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 6e5b8f1..6b632bc 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -1143,7 +1143,7 @@ public class Graph<K, VV, EV> {
 	/**
 	 * Runs a Vertex-Centric iteration on the graph.
 	 * No configuration options are provided.
-
+	 *
 	 * @param vertexUpdateFunction the vertex update function
 	 * @param messagingFunction the messaging function
 	 * @param maximumNumberOfIterations maximum number of iterations to perform
@@ -1181,7 +1181,7 @@ public class Graph<K, VV, EV> {
 
 		iteration.configure(parameters);
 
-		DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
+		DataSet<Vertex<K, VV>> newVertices = this.getVertices().runOperation(iteration);
 
 		return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
index e1d4a1e..3086172 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/IterationConfiguration.java
@@ -22,7 +22,6 @@ import java.util.HashMap;
 import java.util.Map;
 
 import org.apache.flink.api.common.aggregators.Aggregator;
-
 import com.google.common.base.Preconditions;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
index c5eb973..985ea45 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Vertex.java
@@ -31,11 +31,19 @@ public class Vertex<K, V> extends Tuple2<K, V> {
 
 	private static final long serialVersionUID = 1L;
 
-	public Vertex(){}
+	private Long inDegree;
+	private Long outDegree;
+
+	public Vertex(){
+		inDegree = 0L;
+		outDegree = 0L;
+	}
 
 	public Vertex(K k, V val) {
 		this.f0 = k;
 		this.f1 = val;
+		inDegree = 0L;
+		outDegree = 0L;
 	}
 
 	public K getId() {
@@ -53,4 +61,20 @@ public class Vertex<K, V> extends Tuple2<K, V> {
 	public void setValue(V val) {
 		this.f1 = val;
 	}
+
+	public Long getInDegree() {
+		return inDegree;
+	}
+
+	public void setInDegree(Long inDegree) {
+		this.inDegree = inDegree;
+	}
+
+	public Long getOutDegree() {
+		return outDegree;
+	}
+
+	public void setOutDegree(Long outDegree) {
+		this.outDegree = outDegree;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
index f9434d3..30005e7 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/CommunityDetection.java
@@ -64,8 +64,8 @@ public class CommunityDetection implements ProgramDescription {
 		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
 		Graph<Long, Long, Double> graph = Graph.fromDataSet(edges,
 				new MapFunction<Long, Long>() {
-					@Override
-					public Long map(Long label) throws Exception {
+
+					public Long map(Long label) {
 						return label;
 					}
 				}, env);

http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
new file mode 100644
index 0000000..b06425d
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
@@ -0,0 +1,304 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.example.utils.IncrementalSSSPData;
+import org.apache.flink.graph.spargel.IterationConfiguration;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+/**
+ * Incremental Single Sink Shortest Paths Example.
+ *
+ * The program takes as input the resulted graph after a SSSP computation,
+ * an edge to be removed and the initial graph(i.e. before SSSP was computed).
+ *
+ * - If the removed edge does not belong to the SP-graph, no computation is necessary.
+ * The edge is simply removed from the graph.
+ * - If the removed edge is an SP-edge, then all nodes, whose shortest path contains the removed edge,
+ * potentially require re-computation.
+ * When the edge <u, v> is removed, v checks if it has another out-going SP-edge.
+ * If yes, no further computation is required.
+ * If v has no other out-going SP-edge, it invalidates its current value, by setting it to INF.
+ * Then, it informs all its SP-in-neighbors by sending them an INVALIDATE message.
+ * When a vertex u receives an INVALIDATE message from v, it checks whether it has another out-going SP-edge.
+ * If not, it invalidates its current value and propagates the INVALIDATE message.
+ * The propagation stops when a vertex with an alternative shortest path is reached
+ * or when we reach a vertex with no SP-in-neighbors.
+ *
+ * Usage <code>IncrementalSSSPExample &lt;vertex path&gt; &lt;edge path&gt; &lt;edges in SSSP&gt;
+ * &lt;edge to be removed&gt; &lt;result path&gt; &lt;number of iterations&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.IncrementalSSSPData}
+ */
+@SuppressWarnings("serial")
+public class IncrementalSSSPExample implements ProgramDescription {
+
+	public static void main(String [] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Vertex<Long, Double>> vertices = getVerticesDataSet(env);
+
+		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+		DataSet<Edge<Long, Double>> edgesInSSSP = getEdgesinSSSPDataSet(env);
+
+		Edge<Long, Double> edgeToBeRemoved = getEdgeToBeRemoved();
+
+		Graph<Long, Double, Double> graph = Graph.fromDataSet(vertices, edges, env);
+
+		// Assumption: all minimum weight paths are kept
+		Graph<Long, Double, Double> ssspGraph = Graph.fromDataSet(vertices, edgesInSSSP, env);
+
+		// remove the edge
+		graph.removeEdge(edgeToBeRemoved);
+
+		// configure the iteration
+		IterationConfiguration parameters = new IterationConfiguration();
+
+		if(isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
+
+			parameters.setDirection(EdgeDirection.IN);
+			parameters.setOptDegrees(true);
+
+			// run the vertex centric iteration to propagate info
+			Graph<Long, Double, Double> result = ssspGraph.runVertexCentricIteration(new VertexDistanceUpdater(),
+					new InvalidateMessenger(edgeToBeRemoved), maxIterations, parameters);
+
+			DataSet<Vertex<Long, Double>> resultedVertices = result.getVertices();
+
+			// Emit results
+			if(fileOutput) {
+				resultedVertices.writeAsCsv(outputPath, "\n", ",");
+			} else {
+				resultedVertices.print();
+			}
+
+			env.execute("Incremental SSSP Example");
+		} else {
+			// print the vertices
+			if(fileOutput) {
+				vertices.writeAsCsv(outputPath, "\n", ",");
+			} else {
+				vertices.print();
+			}
+
+			env.execute("Incremental SSSP Example");
+		}
+	}
+
+	@Override
+	public String getDescription() {
+		return "Incremental Single Sink Shortest Paths Example";
+	}
+
+	// ******************************************************************************************************************
+	// IncrementalSSSP METHODS
+	// ******************************************************************************************************************
+
+	/**
+	 * Function that verifies whether the edge to be removed is part of the SSSP or not.
+	 * If it is, the src vertex will be invalidated.
+	 *
+	 * @param edgeToBeRemoved
+	 * @param edgesInSSSP
+	 * @return
+	 */
+	private static boolean isInSSSP(final Edge<Long, Double> edgeToBeRemoved, DataSet<Edge<Long, Double>> edgesInSSSP) throws Exception {
+
+		return edgesInSSSP.filter(new FilterFunction<Edge<Long, Double>>() {
+			@Override
+			public boolean filter(Edge<Long, Double> edge) throws Exception {
+				return edge.equals(edgeToBeRemoved);
+			}
+		}).count() > 0;
+	}
+
+	public static final class VertexDistanceUpdater extends VertexUpdateFunction<Long, Double, Double> {
+
+		@Override
+		public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception {
+			if (inMessages.hasNext()) {
+				Long outDegree = vertex.getOutDegree() - 1;
+				// check if the vertex has another SP-Edge
+				if (outDegree > 0) {
+					// there is another shortest path from the source to this vertex
+				} else {
+					// set own value to infinity
+					setNewVertexValue(Double.MAX_VALUE);
+				}
+			}
+		}
+	}
+
+	public static final class InvalidateMessenger extends MessagingFunction<Long, Double, Double, Double> {
+
+		private Edge<Long, Double> edgeToBeRemoved;
+
+		public InvalidateMessenger(Edge<Long, Double> edgeToBeRemoved) {
+			this.edgeToBeRemoved = edgeToBeRemoved;
+		}
+
+		@Override
+		public void sendMessages(Vertex<Long, Double> vertex) throws Exception {
+
+
+			if(getSuperstepNumber() == 1) {
+				if(vertex.getId().equals(edgeToBeRemoved.getSource())) {
+					// activate the edge target
+					sendMessageTo(edgeToBeRemoved.getSource(), Double.MAX_VALUE);
+				}
+			}
+
+			if(getSuperstepNumber() > 1) {
+				// invalidate all edges
+				for(Edge<Long, Double> edge : getEdges()) {
+					sendMessageTo(edge.getSource(), Double.MAX_VALUE);
+				}
+			}
+		}
+	}
+
+	// ******************************************************************************************************************
+	// UTIL METHODS
+	// ******************************************************************************************************************
+
+	private static boolean fileOutput = false;
+
+	private static String verticesInputPath = null;
+
+	private static String edgesInputPath = null;
+
+	private static String edgesInSSSPInputPath = null;
+
+	private static String edgeToBeRemoved = null;
+
+	private static String outputPath = null;
+
+	private static int maxIterations = 5;
+
+	private static boolean parseParameters(String[] args) {
+		if (args.length > 0) {
+			if (args.length == 6) {
+				fileOutput = true;
+				verticesInputPath = args[0];
+				edgesInputPath = args[1];
+				edgesInSSSPInputPath = args[2];
+				edgeToBeRemoved = args[3];
+				outputPath = args[4];
+				maxIterations = Integer.parseInt(args[5]);
+			} else {
+				System.out.println("Executing IncrementalSSSP example with default parameters and built-in default data.");
+				System.out.println("Provide parameters to read input data from files.");
+				System.out.println("See the documentation for the correct format of input files.");
+				System.out.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> <output path> <max iterations>");
+
+				return false;
+			}
+		}
+		return true;
+	}
+
+	private static DataSet<Vertex<Long, Double>> getVerticesDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(verticesInputPath)
+					.lineDelimiter("\n")
+					.types(Long.class, Double.class)
+					.map(new MapFunction<Tuple2<Long, Double>, Vertex<Long, Double>>() {
+
+						@Override
+						public Vertex<Long, Double> map(Tuple2<Long, Double> tuple2) throws Exception {
+							return new Vertex<Long, Double>(tuple2.f0, tuple2.f1);
+						}
+					});
+		} else {
+			System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> " +
+					"<output path> <max iterations>");
+			return IncrementalSSSPData.getDefaultVertexDataSet(env);
+		}
+	}
+
+	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(edgesInputPath)
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class, Double.class)
+					.map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
+
+						@Override
+						public Edge<Long, Double> map(Tuple3<Long, Long, Double> tuple3) throws Exception {
+							return new Edge(tuple3.f0, tuple3.f1, tuple3.f2);
+						}
+					});
+		} else {
+			System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> " +
+					"<output path> <max iterations>");
+			return IncrementalSSSPData.getDefaultEdgeDataSet(env);
+		}
+	}
+
+	private static DataSet<Edge<Long, Double>> getEdgesinSSSPDataSet(ExecutionEnvironment env) {
+		if (fileOutput) {
+			return env.readCsvFile(edgesInSSSPInputPath)
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class, Double.class)
+					.map(new MapFunction<Tuple3<Long, Long, Double>, Edge<Long, Double>>() {
+
+						@Override
+						public Edge<Long, Double> map(Tuple3<Long, Long, Double> tuple3) throws Exception {
+							return new Edge(tuple3.f0, tuple3.f1, tuple3.f2);
+						}
+					});
+		} else {
+			System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> " +
+					"<output path> <max iterations>");
+			return IncrementalSSSPData.getDefaultEdgesInSSSP(env);
+		}
+	}
+
+	private static Edge<Long, Double> getEdgeToBeRemoved() {
+		if (fileOutput) {
+			String [] edgeComponents =  edgeToBeRemoved.split(",");
+
+			return new Edge<Long, Double>(Long.parseLong(edgeComponents[0]), Long.parseLong(edgeComponents[1]),
+					Double.parseDouble(edgeComponents[2]));
+		} else {
+			System.err.println("Usage: IncrementalSSSP <vertex path> <edge path> <edges in SSSP> <edge to be removed> " +
+					"<output path> <max iterations>");
+			return IncrementalSSSPData.getDefaultEdgeToBeRemoved();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java
new file mode 100644
index 0000000..8c38d56
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/IncrementalSSSPData.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Vertex;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the IncrementalSSSP example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class IncrementalSSSPData {
+
+	public static final int NUM_VERTICES = 5;
+
+	public static final String VERTICES = "1,6.0\n" + "2,2.0\n" + "3,3.0\n" + "4,1.0\n" + "5, 0.0";
+
+	public static DataSet<Vertex<Long, Double>> getDefaultVertexDataSet(ExecutionEnvironment env) {
+
+		List<Vertex<Long, Double>> vertices = new ArrayList<Vertex<Long, Double>>();
+		vertices.add(new Vertex<Long, Double>(1L, 6.0));
+		vertices.add(new Vertex<Long, Double>(2L, 2.0));
+		vertices.add(new Vertex<Long, Double>(3L, 3.0));
+		vertices.add(new Vertex<Long, Double>(4L, 1.0));
+		vertices.add(new Vertex<Long, Double>(5L, 0.0));
+
+		return env.fromCollection(vertices);
+	}
+
+	public static final String EDGES = "1,3,3.0\n" + "2,4,3.0\n" + "2,5,2.0\n" + "3,2,1.0\n" + "3,5,5.0\n" +
+			"4,5,1.0";
+
+	public static final DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 3L, 3.0));
+		edges.add(new Edge<Long, Double>(2L, 4L, 3.0));
+		edges.add(new Edge<Long, Double>(2L, 5L, 2.0));
+		edges.add(new Edge<Long, Double>(3L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(3L, 5L, 5.0));
+		edges.add(new Edge<Long, Double>(4L, 5L, 1.0));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final String EDGES_IN_SSSP = "1,3,3.0\n" + "2,5,2.0\n" + "3,2,1.0\n" + "4,5,1.0";
+
+	public static final DataSet<Edge<Long, Double>> getDefaultEdgesInSSSP(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 3L, 3.0));
+		edges.add(new Edge<Long, Double>(2L, 5L, 2.0));
+		edges.add(new Edge<Long, Double>(3L, 2L, 1.0));
+		edges.add(new Edge<Long, Double>(4L, 5L, 1.0));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final String EDGE_TO_BE_REMOVED = "2,5,2.0";
+
+	public static final Edge<Long, Double> getDefaultEdgeToBeRemoved() {
+
+		return new Edge<Long, Double>(2L, 5L, 2.0);
+	}
+
+	public static final String RESULTED_VERTICES = "1," + Double.MAX_VALUE + "\n" + "2," + Double.MAX_VALUE+ "\n"
+			+ "3," + Double.MAX_VALUE + "\n" + "4,1.0\n" + "5,0.0";
+
+	private IncrementalSSSPData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java
index 47e3595..6f72deb 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetectionAlgorithm.java
@@ -79,7 +79,7 @@ public class CommunityDetectionAlgorithm implements GraphAlgorithm<Long, Long, D
 		}
 
 		@Override
-		public void updateVertex(Long vertexKey, Tuple2<Long, Double> labelScore,
+		public void updateVertex(Vertex<Long, Tuple2<Long, Double>> vertex,
 								MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception {
 
 			// we would like these two maps to be ordered
@@ -116,7 +116,7 @@ public class CommunityDetectionAlgorithm implements GraphAlgorithm<Long, Long, D
 			if(receivedLabelsWithScores.size() > 0) {
 				// find the label with the highest score from the ones received
 				Double maxScore = -Double.MAX_VALUE;
-				Long maxScoreLabel = labelScore.f0;
+				Long maxScoreLabel = vertex.getValue().f0;
 				for (Long curLabel : receivedLabelsWithScores.keySet()) {
 
 					if (receivedLabelsWithScores.get(curLabel) > maxScore) {
@@ -128,7 +128,7 @@ public class CommunityDetectionAlgorithm implements GraphAlgorithm<Long, Long, D
 				// find the highest score of maxScoreLabel
 				Double highestScore = labelsWithHighestScore.get(maxScoreLabel);
 				// re-score the new label
-				if (maxScoreLabel != labelScore.f0) {
+				if (maxScoreLabel != vertex.getValue().f0) {
 					highestScore -= delta / getSuperstepNumber();
 				}
 				// else delta = 0
@@ -143,10 +143,11 @@ public class CommunityDetectionAlgorithm implements GraphAlgorithm<Long, Long, D
 			Tuple2<Long, Double>, Double> {
 
 		@Override
-		public void sendMessages(Long vertexKey, Tuple2<Long, Double> vertexValue) throws Exception {
+		public void sendMessages(Vertex<Long, Tuple2<Long, Double>> vertex) throws Exception {
 
-			for(Edge<Long, Double> edge : getOutgoingEdges()) {
-				sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertexValue.f0, vertexValue.f1 * edge.getValue()));
+			for(Edge<Long, Double> edge : getEdges()) {
+				sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertex.getValue().f0,
+						vertex.getValue().f1 * edge.getValue()));
 			}
 
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java
index 7fc0614..0b0f4fc 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagationAlgorithm.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.library;
 
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
@@ -66,12 +67,12 @@ public class LabelPropagationAlgorithm<K extends Comparable<K> & Serializable>
 	 */
 	public static final class UpdateVertexLabel<K> extends VertexUpdateFunction<K, Long, Long> {
 
-		public void updateVertex(K vertexKey, Long vertexValue,
+		public void updateVertex(Vertex<K, Long> vertex,
 				MessageIterator<Long> inMessages) {
 			Map<Long, Long> labelsWithFrequencies = new HashMap<Long, Long>();
 
 			long maxFrequency = 1;
-			long mostFrequentLabel = vertexValue;
+			long mostFrequentLabel = vertex.getValue();
 
 			// store the labels with their frequencies
 			for (Long msg : inMessages) {
@@ -107,8 +108,8 @@ public class LabelPropagationAlgorithm<K extends Comparable<K> & Serializable>
 	 */
 	public static final class SendNewLabelToNeighbors<K> extends MessagingFunction<K, Long, Long, NullValue> {
 
-		public void sendMessages(K vertexKey, Long newLabel) {
-			sendMessageToAllNeighbors(newLabel);
+		public void sendMessages(Vertex<K, Long> vertex) {
+			sendMessageToAllNeighbors(vertex.getValue());
 		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java
index 4803b44..f63fb0d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRankAlgorithm.java
@@ -21,6 +21,7 @@ package org.apache.flink.graph.library;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
@@ -63,8 +64,7 @@ public class PageRankAlgorithm<K extends Comparable<K> & Serializable> implement
 		}
 
 		@Override
-		public void updateVertex(K vertexKey, Double vertexValue,
-				MessageIterator<Double> inMessages) {
+		public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) {
 			double rankSum = 0.0;
 			for (double msg : inMessages) {
 				rankSum += msg;
@@ -91,13 +91,14 @@ public class PageRankAlgorithm<K extends Comparable<K> & Serializable> implement
 		}
 
 		@Override
-		public void sendMessages(K vertexId, Double newRank) {
+		public void sendMessages(Vertex<K, Double> vertex) {
 			if (getSuperstepNumber() == 1) {
 				// initialize vertex ranks
-				newRank = 1.0 / numVertices;
+				vertex.setValue(new Double(1.0 / numVertices));
 			}
-			for (Edge<K, Double> edge : getOutgoingEdges()) {
-				sendMessageTo(edge.getTarget(), newRank * edge.getValue());
+
+			for (Edge<K, Double> edge : getEdges()) {
+				sendMessageTo(edge.getTarget(), vertex.getValue() * edge.getValue());
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java
index 41180cd..e78ae3e 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPathsAlgorithm.java
@@ -75,7 +75,7 @@ public class SingleSourceShortestPathsAlgorithm<K extends Comparable<K> & Serial
 	public static final class VertexDistanceUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
 
 		@Override
-		public void updateVertex(K vertexKey, Double vertexValue,
+		public void updateVertex(Vertex<K, Double> vertex,
 				MessageIterator<Double> inMessages) {
 
 			Double minDistance = Double.MAX_VALUE;
@@ -86,7 +86,7 @@ public class SingleSourceShortestPathsAlgorithm<K extends Comparable<K> & Serial
 				}
 			}
 
-			if (vertexValue > minDistance) {
+			if (vertex.getValue() > minDistance) {
 				setNewVertexValue(minDistance);
 			}
 		}
@@ -101,10 +101,10 @@ public class SingleSourceShortestPathsAlgorithm<K extends Comparable<K> & Serial
 	public static final class MinDistanceMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
 
 		@Override
-		public void sendMessages(K vertexKey, Double newDistance)
+		public void sendMessages(Vertex<K, Double> vertex)
 				throws Exception {
-			for (Edge<K, Double> edge : getOutgoingEdges()) {
-				sendMessageTo(edge.getTarget(), newDistance + edge.getValue());
+			for (Edge<K, Double> edge : getEdges()) {
+				sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
index ab60f15..724a890 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
@@ -26,7 +26,10 @@ import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
@@ -41,21 +44,50 @@ import org.apache.flink.util.Collector;
 public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
-	
+
+	// --------------------------------------------------------------------------------------------
+	//  Attributes that allow vertices to access their in/out degrees and the total number of vertices
+	//  inside an iteration.
+	// --------------------------------------------------------------------------------------------
+
+	private long numberOfVertices;
+
+	public long getNumberOfVertices() {
+		return numberOfVertices;
+	}
+
+	void setNumberOfVertices(long numberOfVertices) {
+		this.numberOfVertices = numberOfVertices;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Attribute that allows the user to choose the neighborhood type(in/out/all) on which to run
+	//  the vertex centric iteration.
+	// --------------------------------------------------------------------------------------------
+
+	private EdgeDirection direction;
+
+	public EdgeDirection getDirection() {
+		return direction;
+	}
+
+	public void setDirection(EdgeDirection direction) {
+		this.direction = direction;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Public API Methods
 	// --------------------------------------------------------------------------------------------
-	
+
 	/**
 	 * This method is invoked once per superstep for each vertex that was changed in that superstep.
 	 * It needs to produce the messages that will be received by vertices in the next superstep.
 	 * 
-	 * @param vertexKey The key of the vertex that was changed.
-	 * @param vertexValue The value (state) of the vertex that was changed.
+	 * @param vertex The vertex that was changed.
 	 * 
 	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
 	 */
-	public abstract void sendMessages(VertexKey vertexKey, VertexValue vertexValue) throws Exception;
+	public abstract void sendMessages(Vertex<VertexKey, VertexValue> vertex) throws Exception;
 	
 	/**
 	 * This method is executed one per superstep before the vertex update function is invoked for each vertex.
@@ -73,30 +105,30 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
 	
 	
 	/**
-	 * Gets an {@link java.lang.Iterable} with all outgoing edges. This method is mutually exclusive with
+	 * Gets an {@link java.lang.Iterable} with all edges. This method is mutually exclusive with
 	 * {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
 	 * 
 	 * @return An iterator with all outgoing edges.
 	 */
 	@SuppressWarnings("unchecked")
-	public Iterable<Edge<VertexKey, EdgeValue>> getOutgoingEdges() {
+	public Iterable<Edge<VertexKey, EdgeValue>> getEdges() {
 		if (edgesUsed) {
-			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
+			throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once.");
 		}
 		edgesUsed = true;
 		this.edgeIterator.set((Iterator<Edge<VertexKey, EdgeValue>>) edges);
 		return this.edgeIterator;
 	}
-	
+
 	/**
 	 * Sends the given message to all vertices that are targets of an outgoing edge of the changed vertex.
-	 * This method is mutually exclusive to the method {@link #getOutgoingEdges()} and may be called only once.
+	 * This method is mutually exclusive to the method {@link #getEdges()} and may be called only once.
 	 * 
 	 * @param m The message to send.
 	 */
 	public void sendMessageToAllNeighbors(Message m) {
 		if (edgesUsed) {
-			throw new IllegalStateException("Can use either 'getOutgoingEdges()' or 'sendMessageToAllTargets()' exactly once.");
+			throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once.");
 		}
 		
 		edgesUsed = true;
@@ -216,6 +248,7 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
 		@Override
 		public Edge<VertexKey, EdgeValue> next() {
 			Edge<VertexKey, EdgeValue> next = input.next();
+			edge.setSource(next.f0);
 			edge.setTarget(next.f1);
 			edge.setValue(next.f2);
 			return edge;
@@ -230,4 +263,24 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
 			return this;
 		}
 	}
+
+	/**
+	 * In order to hide the Tuple3(actualValue, inDegree, outDegree) vertex value from the user,
+	 * another function will be called from {@link org.apache.flink.graph.spargel.VertexCentricIteration}.
+	 *
+	 * This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling
+	 * the regular sendMessages function.
+	 *
+	 * @param newVertexState
+	 * @throws Exception
+	 */
+	void sendMessagesFromVertexCentricIteration(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> newVertexState)
+			throws Exception {
+		Vertex<VertexKey, VertexValue> vertex = new Vertex<VertexKey, VertexValue>(newVertexState.getId(),
+				newVertexState.getValue().f0);
+		vertex.setInDegree(newVertexState.getValue().f1);
+		vertex.setOutDegree(newVertexState.getValue().f2);
+
+		sendMessages(vertex);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
index 4b353d5..549a795 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.spargel;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.IterationConfiguration;
 
 import java.util.ArrayList;
@@ -43,6 +44,15 @@ public class VertexCentricConfiguration extends IterationConfiguration {
 	/** the broadcast variables for the messaging function **/
 	private List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>();
 
+	/** flag that defines whether the degrees option is set **/
+	private boolean optDegrees = false;
+
+	/** flag that defines whether the number of vertices option is set **/
+	private boolean optNumVertices = false;
+
+	/** the direction in which the messages should be sent **/
+	private EdgeDirection direction = EdgeDirection.OUT;
+
 	public VertexCentricConfiguration() {}
 
 	/**
@@ -85,5 +95,33 @@ public class VertexCentricConfiguration extends IterationConfiguration {
 		return this.bcVarsMessaging;
 	}
 
+	// ----------------------------------------------------------------------------------
+	// The direction, degrees and the total number of vertices should be optional.
+	// The user can access them by setting the direction, degrees or the numVertices options.
+	// ----------------------------------------------------------------------------------
+
+	public boolean isOptDegrees() {
+		return optDegrees;
+	}
+
+	public void setOptDegrees(boolean optDegrees) {
+		this.optDegrees = optDegrees;
+	}
+
+	public boolean isOptNumVertices() {
+		return optNumVertices;
+	}
+
+	public void setOptNumVertices(boolean optNumVertices) {
+		this.optNumVertices = optNumVertices;
+	}
+
+	public EdgeDirection getDirection() {
+		return direction;
+	}
+
+	public void setDirection(EdgeDirection direction) {
+		this.direction = direction;
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
index 79da664..95c6090 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -22,6 +22,11 @@ import java.util.Iterator;
 import java.util.Map;
 
 import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
@@ -29,11 +34,14 @@ import org.apache.flink.api.java.operators.CoGroupOperator;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.util.Collector;
 
@@ -71,7 +79,7 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 	implements CustomUnaryOperation<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>>
 {
 	private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction;
-	
+
 	private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
 	
 	private final DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue;
@@ -83,7 +91,8 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 	private DataSet<Vertex<VertexKey, VertexValue>> initialVertices;
 
 	private VertexCentricConfiguration configuration;
-	
+
+	private DataSet<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> verticesWithDegrees;
 	// ----------------------------------------------------------------------------------
 	
 	private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
@@ -135,69 +144,46 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 		if (this.initialVertices == null) {
 			throw new IllegalStateException("The input data set has not been set.");
 		}
-		
+
 		// prepare some type information
-		TypeInformation<Vertex<VertexKey, VertexValue>> vertexTypes = initialVertices.getType();
 		TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
 		TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType);
 
-		final int[] zeroKeyPos = new int[] {0};
-	
-		final DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> iteration =
-			this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations, zeroKeyPos);
+		// create a graph
+		Graph<VertexKey, VertexValue, EdgeValue> graph =
+				Graph.fromDataSet(initialVertices, edgesWithValue, ExecutionEnvironment.getExecutionEnvironment());
 
-		// set up the iteration operator
-		if (this.configuration != null) {
+		// check whether the numVertices option is set and, if so, compute the total number of vertices
+		// and set it within the messaging and update functions
 
-			iteration.name(this.configuration.getName(
-					"Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")"));
-			iteration.parallelism(this.configuration.getParallelism());
-			iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
-
-			// register all aggregators
-			for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
-				iteration.registerAggregator(entry.getKey(), entry.getValue());
+		if (this.configuration !=null && this.configuration.isOptNumVertices()) {
+			try {
+				long numberOfVertices = graph.numberOfVertices();
+				messagingFunction.setNumberOfVertices(numberOfVertices);
+				updateFunction.setNumberOfVertices(numberOfVertices);
+			} catch (Exception e) {
+				e.printStackTrace();
 			}
 		}
-		else {
-			// no configuration provided; set default name
-			iteration.name("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")");
-		}
-		
-		// build the messaging function (co group)
-		CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
-		MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue> messenger = new MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue>(messagingFunction, messageTypeInfo);
-		messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(0).equalTo(0).with(messenger);
-		
-		// configure coGroup message function with name and broadcast variables
-		messages = messages.name("Messaging");
 
-		if (this.configuration != null) {
-			for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
-				messages = messages.withBroadcastSet(e.f1, e.f0);
-			}			
+		if(this.configuration != null) {
+			messagingFunction.setDirection(this.configuration.getDirection());
+		} else {
+			messagingFunction.setDirection(EdgeDirection.OUT);
 		}
-		
-		VertexUpdateUdf<VertexKey, VertexValue, Message> updateUdf = new VertexUpdateUdf<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
-		
-		// build the update function (co group)
-		CoGroupOperator<?, ?, Vertex<VertexKey, VertexValue>> updates =
-				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
-		
-		// configure coGroup update function with name and broadcast variables
-		updates = updates.name("Vertex State Updates");
 
-		if (this.configuration != null) {
-			for (Tuple2<String, DataSet<?>> e : this.configuration.getUpdateBcastVars()) {
-				updates = updates.withBroadcastSet(e.f1, e.f0);
-			}			
-		}
+		// retrieve the direction in which the updates are made and in which the messages are sent
+		EdgeDirection messagingDirection = messagingFunction.getDirection();
 
-		// let the operator know that we preserve the key field
-		updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
-		
-		return iteration.closeWith(updates, updates);
-		
+		DataSet<Tuple2<VertexKey, Message>> messages = null;
+
+		// check whether the degrees option is set and, if so, compute the in and the out degrees and
+		// add them to the vertex value
+		if(this.configuration != null && this.configuration.isOptDegrees()) {
+			return createResultVerticesWithDegrees(graph, messagingDirection, messages, messageTypeInfo);
+		} else {
+			return createResultSimpleVertex(messagingDirection, messages, messageTypeInfo);
+		}
 	}
 
 	/**
@@ -224,47 +210,89 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 	{
 		return new VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations);
 	}
-	
+
+	/**
+	 * Configures this vertex-centric iteration with the provided parameters.
+	 *
+	 * @param parameters the configuration parameters
+	 */
+	public void configure(VertexCentricConfiguration parameters) {
+		this.configuration = parameters;
+	}
+
+	/**
+	 * @return the configuration parameters of this vertex-centric iteration
+	 */
+	public VertexCentricConfiguration getIterationConfiguration() {
+		return this.configuration;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Wrapping UDFs
 	// --------------------------------------------------------------------------------------------
-	
+
 	private static final class VertexUpdateUdf<VertexKey, VertexValue, Message> 
 		extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>>
 		implements ResultTypeQueryable<Vertex<VertexKey, VertexValue>>
 	{
 		private static final long serialVersionUID = 1L;
 		
-		private final VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction;
+		final VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction;
 
-		private final MessageIterator<Message> messageIter = new MessageIterator<Message>();
+		final MessageIterator<Message> messageIter = new MessageIterator<Message>();
 		
-		private transient TypeInformation<Vertex<VertexKey, VertexValue>> resultType;
+		private transient TypeInformation<Vertex<VertexKey, VV>> resultType;
 		
 		
 		private VertexUpdateUdf(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
-				TypeInformation<Vertex<VertexKey, VertexValue>> resultType)
+				TypeInformation<Vertex<VertexKey, VV>> resultType)
 		{
 			this.vertexUpdateFunction = vertexUpdateFunction;
 			this.resultType = resultType;
 		}
 
 		@Override
-		public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages, Iterable<Vertex<VertexKey, VertexValue>> vertex,
-				Collector<Vertex<VertexKey, VertexValue>> out)
-			throws Exception
-		{
+		public void open(Configuration parameters) throws Exception {
+			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+				this.vertexUpdateFunction.init(getIterationRuntimeContext());
+			}
+			this.vertexUpdateFunction.preSuperstep();
+		}
+		
+		@Override
+		public void close() throws Exception {
+			this.vertexUpdateFunction.postSuperstep();
+		}
+
+		@Override
+		public TypeInformation<Vertex<VertexKey, VV>> getProducedType() {
+			return this.resultType;
+		}
+	}
+
+	private static final class VertexUpdateUdfSimpleVertexValue<VertexKey, VertexValue, Message>
+		extends VertexUpdateUdf<VertexKey, VertexValue, Message> {
+
+
+		private VertexUpdateUdfSimpleVertexValue(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, TypeInformation<Vertex<VertexKey, VertexValue>> resultType) {
+			super(vertexUpdateFunction, resultType);
+		}
+
+		@Override
+		public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages,
+							Iterable<Vertex<VertexKey, VertexValue>> vertex,
+							Collector<Vertex<VertexKey, VertexValue>> out) throws Exception {
 			final Iterator<Vertex<VertexKey, VertexValue>> vertexIter = vertex.iterator();
-			
+
 			if (vertexIter.hasNext()) {
 				Vertex<VertexKey, VertexValue> vertexState = vertexIter.next();
-				
+
 				@SuppressWarnings("unchecked")
 				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
 				messageIter.setSource(downcastIter);
-				
+
 				vertexUpdateFunction.setOutput(vertexState, out);
-				vertexUpdateFunction.updateVertex(vertexState.f0, vertexState.f1, messageIter);
+				vertexUpdateFunction.updateVertex(vertexState, messageIter);
 			}
 			else {
 				final Iterator<Tuple2<VertexKey, Message>> messageIter = messages.iterator();
@@ -280,23 +308,45 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 				}
 			}
 		}
-		
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.vertexUpdateFunction.init(getIterationRuntimeContext());
-			}
-			this.vertexUpdateFunction.preSuperstep();
-		}
-		
-		@Override
-		public void close() throws Exception {
-			this.vertexUpdateFunction.postSuperstep();
+	}
+
+	private static final class VertexUpdateUdfVertexValueWithDegrees<VertexKey,	VertexValue, Message> extends VertexUpdateUdf<VertexKey,
+			Tuple3<VertexValue, Long, Long>, VertexValue, Message> {
+
+
+		private VertexUpdateUdfVertexValueWithDegrees(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, TypeInformation<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> resultType) {
+			super(vertexUpdateFunction, resultType);
 		}
 
 		@Override
-		public TypeInformation<Vertex<VertexKey, VertexValue>> getProducedType() {
-			return this.resultType;
+		public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages,
+							Iterable<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> vertex,
+							Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> out) throws Exception {
+			final Iterator<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> vertexIter = vertex.iterator();
+
+			if (vertexIter.hasNext()) {
+				Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> vertexState = vertexIter.next();
+
+				@SuppressWarnings("unchecked")
+				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
+				messageIter.setSource(downcastIter);
+
+				vertexUpdateFunction.setOutputWithDegrees(vertexState, out);
+				vertexUpdateFunction.updateVertexFromVertexCentricIteration(vertexState, messageIter);
+			}
+			else {
+				final Iterator<Tuple2<VertexKey, Message>> messageIter = messages.iterator();
+				if (messageIter.hasNext()) {
+					String message = "Target vertex does not exist!.";
+					try {
+						Tuple2<VertexKey, Message> next = messageIter.next();
+						message = "Target vertex '" + next.f0 + "' does not exist!.";
+					} catch (Throwable t) {}
+					throw new Exception(message);
+				} else {
+					throw new Exception();
+				}
+			}
 		}
 	}
 
@@ -309,31 +359,17 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 	{
 		private static final long serialVersionUID = 1L;
 		
-		private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
+		final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
 		
 		private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
-		
-		
+
+
 		private MessagingUdfWithEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
 				TypeInformation<Tuple2<VertexKey, Message>> resultType)
 		{
 			this.messagingFunction = messagingFunction;
 			this.resultType = resultType;
 		}
-
-		@Override
-		public void coGroup(Iterable<Edge<VertexKey, EdgeValue>> edges,
-				Iterable<Vertex<VertexKey, VertexValue>> state, Collector<Tuple2<VertexKey, Message>> out)
-			throws Exception
-		{
-			final Iterator<Vertex<VertexKey, VertexValue>> stateIter = state.iterator();
-			
-			if (stateIter.hasNext()) {
-				Vertex<VertexKey, VertexValue> newVertexState = stateIter.next();
-				messagingFunction.set((Iterator<?>) edges.iterator(), out);
-				messagingFunction.sendMessages(newVertexState.f0, newVertexState.f1);
-			}
-		}
 		
 		@Override
 		public void open(Configuration parameters) throws Exception {
@@ -355,19 +391,301 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 		}
 	}
 
+	private static final class MessagingUdfWithEdgeValuesSimpleVertexValue<VertexKey, VertexValue, Message, EdgeValue>
+			extends MessagingUdfWithEdgeValues<VertexKey, VertexValue, VertexValue, Message, EdgeValue> {
+
+		private MessagingUdfWithEdgeValuesSimpleVertexValue(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
+															TypeInformation<Tuple2<VertexKey, Message>> resultType) {
+			super(messagingFunction, resultType);
+		}
+
+		@Override
+		public void coGroup(Iterable<Edge<VertexKey, EdgeValue>> edges,
+							Iterable<Vertex<VertexKey, VertexValue>> state,
+							Collector<Tuple2<VertexKey, Message>> out) throws Exception {
+			final Iterator<Vertex<VertexKey, VertexValue>> stateIter = state.iterator();
+
+			if (stateIter.hasNext()) {
+				Vertex<VertexKey, VertexValue> newVertexState = stateIter.next();
+				messagingFunction.set((Iterator<?>) edges.iterator(), out);
+				messagingFunction.sendMessages(newVertexState);
+			}
+		}
+	}
+
+	private static final class MessagingUdfWithEdgeValuesVertexValueWithDegrees<VertexKey, VertexValue, Message, EdgeValue>
+			extends MessagingUdfWithEdgeValues<VertexKey, Tuple3<VertexValue, Long, Long>, VertexValue, Message, EdgeValue> {
+
+
+		private MessagingUdfWithEdgeValuesVertexValueWithDegrees
+				(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
+				TypeInformation<Tuple2<VertexKey, Message>> resultType) {
+			super(messagingFunction, resultType);
+		}
+
+		@Override
+		public void coGroup(Iterable<Edge<VertexKey, EdgeValue>> edges,
+							Iterable<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> state,
+							Collector<Tuple2<VertexKey, Message>> out) throws Exception {
+
+			final Iterator<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> stateIter = state.iterator();
+
+			if (stateIter.hasNext()) {
+				Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> newVertexState = stateIter.next();
+				messagingFunction.set((Iterator<?>) edges.iterator(), out);
+				messagingFunction.sendMessagesFromVertexCentricIteration(newVertexState);
+			}
+		}
+	}
+
+
+	// --------------------------------------------------------------------------------------------
+	//  UTIL methods
+	// --------------------------------------------------------------------------------------------
+
 	/**
-	 * Configures this vertex-centric iteration with the provided parameters.
+	 * Method that builds the messaging function using a coGroup operator for a simple vertex(without
+	 * degrees).
+	 * It afterwards configures the function with a custom name and broadcast variables.
 	 *
-	 * @param parameters the configuration parameters
+	 * @param iteration
+	 * @param messageTypeInfo
+	 * @param whereArg the argument for the where within the coGroup
+	 * @param equalToArg the argument for the equalTo within the coGroup
+	 * @return the messaging function
 	 */
-	public void configure(VertexCentricConfiguration parameters) {
-		this.configuration = parameters;
+	private CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> buildMessagingFunction(
+			DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> iteration,
+			TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo, int whereArg, int equalToArg) {
+
+		// build the messaging function (co group)
+		CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
+		MessagingUdfWithEdgeValues<VertexKey, VertexValue, VertexValue, Message, EdgeValue> messenger =
+				new MessagingUdfWithEdgeValuesSimpleVertexValue<VertexKey, VertexValue, Message, EdgeValue>(
+						messagingFunction, messageTypeInfo);
+
+		messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
+				.equalTo(equalToArg).with(messenger);
+
+		// configure coGroup message function with name and broadcast variables
+		messages = messages.name("Messaging");
+		if(this.configuration != null) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
+				messages = messages.withBroadcastSet(e.f1, e.f0);
+			}
+		}
+
+		return messages;
 	}
 
 	/**
-	 * @return the configuration parameters of this vertex-centric iteration
+	 * Method that builds the messaging function using a coGroup operator for a vertex
+	 * containing degree information.
+	 * It afterwards configures the function with a custom name and broadcast variables.
+	 *
+	 * @param iteration
+	 * @param messageTypeInfo
+	 * @param whereArg the argument for the where within the coGroup
+	 * @param equalToArg the argument for the equalTo within the coGroup
+	 * @return the messaging function
 	 */
-	public VertexCentricConfiguration getIterationConfiguration() {
-		return this.configuration;
+	private CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> buildMessagingFunctionVerticesWithDegrees(
+			DeltaIteration<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>,
+					Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> iteration,
+			TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo, int whereArg, int equalToArg) {
+
+		// build the messaging function (co group)
+		CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
+		MessagingUdfWithEdgeValues<VertexKey, Tuple3<VertexValue, Long, Long>, VertexValue, Message, EdgeValue> messenger =
+				new MessagingUdfWithEdgeValuesVertexValueWithDegrees<VertexKey, VertexValue, Message, EdgeValue>(
+						messagingFunction, messageTypeInfo);
+
+		messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
+				.equalTo(equalToArg).with(messenger);
+
+		// configure coGroup message function with name and broadcast variables
+		messages = messages.name("Messaging");
+
+		if (this.configuration != null) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
+				messages = messages.withBroadcastSet(e.f1, e.f0);
+			}
+		}
+
+		return messages;
+	}
+
+	/**
+	 * Helper method which sets up an iteration with the given vertex value(either simple or with degrees)
+	 *
+	 * @param vertices
+	 * @param <VV>
+	 */
+
+	private <VV> DeltaIteration<Vertex<VertexKey, VV>, Vertex<VertexKey, VV>> setUpIteration(
+			DataSet<Vertex<VertexKey, VV>> vertices) {
+
+		final int[] zeroKeyPos = new int[] {0};
+
+		final DeltaIteration<Vertex<VertexKey, VV>, Vertex<VertexKey, VV>> iteration =
+				vertices.iterateDelta(vertices, this.maximumNumberOfIterations, zeroKeyPos);
+
+		// set up the iteration operator
+		if (this.configuration != null) {
+
+			iteration.name(this.configuration.getName("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")"));
+			iteration.parallelism(this.configuration.getParallelism());
+			iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
+
+			// register all aggregators
+			for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
+				iteration.registerAggregator(entry.getKey(), entry.getValue());
+			}
+		}
+		else {
+			// no configuration provided; set default name
+			iteration.name("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")");
+		}
+
+		return iteration;
+	}
+
+	/**
+	 * Creates the operator that represents this vertex centric graph computation for a simple vertex.
+	 *
+	 * @param messagingDirection
+	 * @param messages
+	 * @param messageTypeInfo
+	 * @return the operator
+	 */
+	private DataSet<Vertex<VertexKey, VertexValue>> createResultSimpleVertex(EdgeDirection messagingDirection,
+																			DataSet<Tuple2<VertexKey, Message>> messages,
+																			TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo) {
+		TypeInformation<Vertex<VertexKey, VertexValue>> vertexTypes = initialVertices.getType();
+
+		final DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> iteration =
+				setUpIteration(this.initialVertices);
+
+		switch (messagingDirection) {
+			case IN:
+				messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0);
+				break;
+			case OUT:
+				messages = buildMessagingFunction(iteration, messageTypeInfo, 0, 0);
+				break;
+			case ALL:
+				messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0)
+						.union(buildMessagingFunction(iteration, messageTypeInfo, 0, 0)) ;
+				break;
+			default:
+				throw new IllegalArgumentException("Illegal edge direction");
+		}
+
+		VertexUpdateUdf<VertexKey, VertexValue, VertexValue, Message> updateUdf =
+				new VertexUpdateUdfSimpleVertexValue<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
+
+		// build the update function (co group)
+		CoGroupOperator<?, ?, Vertex<VertexKey, VertexValue>> updates =
+				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
+
+		configureUpdateFunction(updates);
+
+		return iteration.closeWith(updates, updates);
+	}
+
+	/**
+	 * Creates the operator that represents this vertex centric graph computation for a vertex with in
+	 * and out degrees added to the vertex value.
+	 *
+	 * @param graph
+	 * @param messagingDirection
+	 * @param messages
+	 * @param messageTypeInfo
+	 * @return the operator
+	 */
+	private DataSet<Vertex<VertexKey, VertexValue>> createResultVerticesWithDegrees(
+			Graph<VertexKey, VertexValue, EdgeValue> graph,
+			EdgeDirection messagingDirection,
+			DataSet<Tuple2<VertexKey, Message>> messages,
+			TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo) {
+
+		this.updateFunction.setOptDegrees(this.configuration.isOptDegrees());
+
+		DataSet<Tuple2<VertexKey, Long>> inDegrees = graph.inDegrees();
+		DataSet<Tuple2<VertexKey, Long>> outDegrees = graph.outDegrees();
+
+		DataSet<Tuple3<VertexKey, Long, Long>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
+				.with(new FlatJoinFunction<Tuple2<VertexKey, Long>, Tuple2<VertexKey, Long>, Tuple3<VertexKey, Long, Long>>() {
+
+					@Override
+					public void join(Tuple2<VertexKey, Long> first, Tuple2<VertexKey, Long> second, Collector<Tuple3<VertexKey, Long, Long>> out) throws Exception {
+						out.collect(new Tuple3<VertexKey, Long, Long>(first.f0, first.f1, second.f1));
+					}
+				});
+
+		DataSet<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> verticesWithDegrees= initialVertices
+				.join(degrees).where(0).equalTo(0)
+				.with(new FlatJoinFunction<Vertex<VertexKey,VertexValue>, Tuple3<VertexKey,Long,Long>, Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>>() {
+					@Override
+					public void join(Vertex<VertexKey, VertexValue> vertex,
+									Tuple3<VertexKey, Long, Long> degrees,
+									Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> out) throws Exception {
+
+						out.collect(new Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>(vertex.getId(),
+								new Tuple3<VertexValue, Long, Long>(vertex.getValue(), degrees.f1, degrees.f2)));
+					}
+				});
+
+		// add type info
+		TypeInformation<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> vertexTypes = verticesWithDegrees.getType();
+
+		final DeltaIteration<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>,
+				Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> iteration =
+				setUpIteration(verticesWithDegrees);
+
+		switch (messagingDirection) {
+			case IN:
+				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0);
+				break;
+			case OUT:
+				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0);
+				break;
+			case ALL:
+				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0)
+						.union(buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0)) ;
+				break;
+			default:
+				throw new IllegalArgumentException("Illegal edge direction");
+		}
+
+		VertexUpdateUdf<VertexKey, Tuple3<VertexValue, Long, Long>, VertexValue, Message> updateUdf =
+				new VertexUpdateUdfVertexValueWithDegrees<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
+
+		// build the update function (co group)
+		CoGroupOperator<?, ?, Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> updates =
+				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
+
+		configureUpdateFunction(updates);
+
+		return iteration.closeWith(updates, updates).map(new MapFunction<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>, Vertex<VertexKey, VertexValue>>() {
+			@Override
+			public Vertex<VertexKey, VertexValue> map(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> vertex) throws Exception {
+				return new Vertex<VertexKey, VertexValue>(vertex.getId(), vertex.getValue().f0);
+			}
+		});
+	}
+
+	private <VV> void configureUpdateFunction(CoGroupOperator<?, ?, Vertex<VertexKey, VV>> updates) {
+
+		// configure coGroup update function with name and broadcast variables
+		updates = updates.name("Vertex State Updates");
+		if (this.configuration != null) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getUpdateBcastVars()) {
+				updates = updates.withBroadcastSet(e.f1, e.f0);
+			}
+		}
+
+		// let the operator know that we preserve the key field
+		updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
index 9122053..97a678c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -23,6 +23,7 @@ import java.util.Collection;
 
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
@@ -39,7 +40,34 @@ import org.apache.flink.util.Collector;
 public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
-	
+
+	// --------------------------------------------------------------------------------------------
+	//  Attributes that allow vertices to access their in/out degrees and the total number of vertices
+	//  inside an iteration.
+	// --------------------------------------------------------------------------------------------
+
+	private long numberOfVertices;
+
+	public long getNumberOfVertices() {
+		return numberOfVertices;
+	}
+
+	void setNumberOfVertices(long numberOfVertices) {
+		this.numberOfVertices = numberOfVertices;
+	}
+
+	//---------------------------------------------------------------------------------------------
+
+	private boolean optDegrees;
+
+	public boolean isOptDegrees() {
+		return optDegrees;
+	}
+
+	void setOptDegrees(boolean optDegrees) {
+		this.optDegrees = optDegrees;
+	}
+
 	// --------------------------------------------------------------------------------------------
 	//  Public API Methods
 	// --------------------------------------------------------------------------------------------
@@ -49,13 +77,12 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
 	 * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
 	 * state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
 	 * 
-	 * @param vertexKey The key (identifier) of the vertex.
-	 * @param vertexValue The value (state) of the vertex.
+	 * @param vertex The vertex.
 	 * @param inMessages The incoming messages to this vertex.
 	 * 
 	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
 	 */
-	public abstract void updateVertex(VertexKey vertexKey, VertexValue vertexValue, MessageIterator<Message> inMessages) throws Exception;
+	public abstract void updateVertex(Vertex<VertexKey, VertexValue> vertex, MessageIterator<Message> inMessages) throws Exception;
 	
 	/**
 	 * This method is executed one per superstep before the vertex update function is invoked for each vertex.
@@ -77,8 +104,13 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
 	 * @param newValue The new vertex value.
 	 */
 	public void setNewVertexValue(VertexValue newValue) {
-		outVal.f1 = newValue;
-		out.collect(outVal);
+		if(isOptDegrees()) {
+			outValWithDegrees.f1.f0 = newValue;
+			outWithDegrees.collect(outValWithDegrees);
+		} else {
+			outVal.setValue(newValue);
+			out.collect(outVal);
+		}
 	}
 	
 	/**
@@ -128,18 +160,51 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
 	// --------------------------------------------------------------------------------------------
 	
 	private IterationRuntimeContext runtimeContext;
-	
+
 	private Collector<Vertex<VertexKey, VertexValue>> out;
 	
+	private Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> outWithDegrees;
+
 	private Vertex<VertexKey, VertexValue> outVal;
-	
-	
+
+	private Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> outValWithDegrees;
+
+
 	void init(IterationRuntimeContext context) {
 		this.runtimeContext = context;
 	}
-	
-	void setOutput(Vertex<VertexKey, VertexValue> val, Collector<Vertex<VertexKey, VertexValue>> out) {
+
+
+
+	void setOutputWithDegrees(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> outValWithDegrees,
+							Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> outWithDegrees) {
+		this.outValWithDegrees = outValWithDegrees;
+		this.outWithDegrees = outWithDegrees;
+	}
+
+	void setOutput(Vertex<VertexKey, VertexValue> outVal, Collector<Vertex<VertexKey, VertexValue>> out) {
+		this.outVal = outVal;
 		this.out = out;
-		this.outVal = val;
+	}
+
+	/**
+	 * In order to hide the Tuple3(actualValue, inDegree, OutDegree) vertex value from the user,
+	 * another function will be called from {@link org.apache.flink.graph.spargel.VertexCentricIteration}.
+	 *
+	 * This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling
+	 * the regular updateVertex function.
+	 *
+	 * @param vertexState
+	 * @param inMessages
+	 * @throws Exception
+	 */
+	void updateVertexFromVertexCentricIteration(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> vertexState,
+												MessageIterator<Message> inMessages) throws Exception {
+		Vertex<VertexKey, VertexValue> vertex = new Vertex<VertexKey, VertexValue>(vertexState.getId(),
+				vertexState.getValue().f0);
+		vertex.setInDegree(vertexState.getValue().f1);
+		vertex.setOutDegree(vertexState.getValue().f2);
+
+		updateVertex(vertex, inMessages);
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
index d84952a..e93f581 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
@@ -22,6 +22,7 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.MessageIterator;
@@ -58,20 +59,20 @@ public class CollectionModeSuperstepITCase {
 	
 	public static final class UpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
 		@Override
-		public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
 			long superstep = getSuperstepNumber();
-			Assert.assertEquals(true, vertexValue == superstep);
-			setNewVertexValue(vertexValue + 1);
+			Assert.assertEquals(true, vertex.getValue() == superstep);
+			setNewVertexValue(vertex.getValue() + 1);
 		}
 	}
 	
 	public static final class MessageFunction extends MessagingFunction<Long, Long, Long, Long> {
 		@Override
-		public void sendMessages(Long vertexId, Long vertexValue) {
+		public void sendMessages(Vertex<Long, Long> vertex) {
 			long superstep = getSuperstepNumber();
-			Assert.assertEquals(true, vertexValue == superstep);
+			Assert.assertEquals(true, vertex.getValue() == superstep);
 			//send message to keep vertices active
-			sendMessageToAllNeighbors(vertexValue);
+			sendMessageToAllNeighbors(vertex.getValue());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
index 4e8412c..35b8233 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
@@ -150,10 +150,10 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		}
 
 		@Override
-		public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
 			long superstep = getSuperstepNumber();
 			aggregator.aggregate(superstep);
-			setNewVertexValue(vertexValue + 1);
+			setNewVertexValue(vertex.getValue() + 1);
 		}
 	}
 	
@@ -178,9 +178,9 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		}
 
 		@Override
-		public void sendMessages(Long vertexId, Long vertexValue) {
+		public void sendMessages(Vertex<Long, Long> vertex) {
 			//send message to keep vertices active
-			sendMessageToAllNeighbors(vertexValue);
+			sendMessageToAllNeighbors(vertex.getValue());
 		}
 	}
 
@@ -188,8 +188,8 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 	public static final class DummyUpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
 
 		@Override
-		public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
-			setNewVertexValue(vertexValue + 1);
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+			setNewVertexValue(vertex.getValue() + 1);
 		}
 	}
 	
@@ -197,9 +197,9 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 	public static final class DummyMessageFunction extends MessagingFunction<Long, Long, Long, Long> {
 
 		@Override
-		public void sendMessages(Long vertexId, Long vertexValue) {
+		public void sendMessages(Vertex<Long, Long> vertex) {
 			//send message to keep vertices active
-			sendMessageToAllNeighbors(vertexValue);
+			sendMessageToAllNeighbors(vertex.getValue());
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
index f2f3d8c..17a4cb0 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/ConnectedComponentsWithRandomisedEdgesITCase.java
@@ -66,7 +66,6 @@ public class ConnectedComponentsWithRandomisedEdgesITCase extends JavaProgramTes
 		result.writeAsCsv(resultPath, "\n", " ");
 		env.execute();
 	}
-
 	/**
 	 * A map function that takes a Long value and creates a 2-tuple out of it:
 	 * <pre>(Long value) -> (value, value)</pre>

http://git-wip-us.apache.org/repos/asf/flink/blob/585d27d0/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
new file mode 100644
index 0000000..58738d2
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/IncrementalSSSPITCase.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.IncrementalSSSPExample;
+import org.apache.flink.graph.example.utils.IncrementalSSSPData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class IncrementalSSSPITCase extends MultipleProgramsTestBase {
+
+	private String verticesPath;
+
+	private String edgesPath;
+
+	private String edgesInSSSPPath;
+
+	private String resultPath;
+
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public IncrementalSSSPITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception {
+		resultPath = tempFolder.newFile().toURI().toString();
+		File verticesFile = tempFolder.newFile();
+		Files.write(IncrementalSSSPData.VERTICES, verticesFile, Charsets.UTF_8);
+
+		File edgesFile = tempFolder.newFile();
+		Files.write(IncrementalSSSPData.EDGES, edgesFile, Charsets.UTF_8);
+
+		File edgesInSSSPFile = tempFolder.newFile();
+		Files.write(IncrementalSSSPData.EDGES_IN_SSSP, edgesInSSSPFile, Charsets.UTF_8);
+
+		verticesPath = verticesFile.toURI().toString();
+		edgesPath = edgesFile.toURI().toString();
+		edgesInSSSPPath = edgesInSSSPFile.toURI().toString();
+	}
+
+	@Test
+	public void testIncrementalSSSPExample() throws Exception {
+		IncrementalSSSPExample.main(new String[]{verticesPath, edgesPath, edgesInSSSPPath,
+				IncrementalSSSPData.EDGE_TO_BE_REMOVED, resultPath, IncrementalSSSPData.NUM_VERTICES + ""});
+		expected = IncrementalSSSPData.RESULTED_VERTICES;
+	}
+
+	@After
+	public void after() throws Exception {
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+}