You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/05/19 23:03:36 UTC

[07/10] flink git commit: [FLINK-1523] [gelly] added getIn/Out degrees methods in update and messaging functions; deleted VertexWithValue type; deleted InaccessibleMethodException; if the options are not set, -1 is returned; added missing jav

[FLINK-1523] [gelly] added getIn/Out degrees methods in update and messaging functions;
    deleted VertexWithValue type;
    deleted InaccessibleMethodException; if the options are not set, -1 is returned;
    added missing javadocs;
    added tests;
    renamed type parameters VertexKey -> K, VertexValue -> VV, EdgeValue -> EV.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e2f28d47
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e2f28d47
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e2f28d47

Branch: refs/heads/master
Commit: e2f28d47f6108c49c02ccba8ded7c70d39bab4bb
Parents: e172067
Author: vasia <va...@apache.org>
Authored: Mon May 11 15:21:53 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Tue May 19 22:38:04 2015 +0200

----------------------------------------------------------------------
 .../graph/InaccessibleMethodException.java      |  32 --
 .../apache/flink/graph/VertexWithDegrees.java   |  72 ----
 .../graph/example/IncrementalSSSPExample.java   |   7 +-
 .../library/ConnectedComponentsAlgorithm.java   |   9 +-
 .../flink/graph/spargel/MessagingFunction.java  | 107 +++---
 .../graph/spargel/VertexCentricIteration.java   | 330 +++++++++----------
 .../graph/spargel/VertexUpdateFunction.java     |  87 +++--
 .../test/CollectionModeSuperstepITCase.java     |   1 -
 .../test/GatherSumApplyConfigurationITCase.java |   1 -
 .../test/VertexCentricConfigurationITCase.java  | 285 ++++++++++------
 ...CentricConfigurationWithExceptionITCase.java | 190 -----------
 ...ctedComponentsWithRandomisedEdgesITCase.java |   1 +
 .../test/example/IncrementalSSSPITCase.java     |   4 +-
 13 files changed, 466 insertions(+), 660 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/InaccessibleMethodException.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/InaccessibleMethodException.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/InaccessibleMethodException.java
deleted file mode 100644
index de018de..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/InaccessibleMethodException.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph;
-
-/**
- * The exception that gets thrown when the degree option or the number of vertices
- * option in {@link org.apache.flink.graph.spargel.IterationConfiguration} was not set.
- */
-public class InaccessibleMethodException extends Exception {
-
-	public InaccessibleMethodException() {}
-
-	public InaccessibleMethodException(String text) {
-		super(text);
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/VertexWithDegrees.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/VertexWithDegrees.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/VertexWithDegrees.java
deleted file mode 100644
index b692475..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/VertexWithDegrees.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph;
-
-import java.io.Serializable;
-
-/**
- * Represents the graph's nodes. It carries an ID and a value as well as the vertex inDegree and outDegree.
- * For vertices with no value, use {@link org.apache.flink.types.NullValue} as the value type.
- *
- * @param <K>
- * @param <V>
- */
-public class VertexWithDegrees<K extends Comparable<K> & Serializable, V extends Serializable>
-		extends Vertex<K, V> {
-
-	private long inDegree;
-
-	private long outDegree;
-
-	public VertexWithDegrees() {
-		super();
-		inDegree = -1l;
-		outDegree = -1l;
-	}
-
-	public VertexWithDegrees(K k, V v) {
-		super(k,v);
-		inDegree = 0l;
-		outDegree = 0l;
-	}
-
-	public Long getInDegree() throws Exception{
-		if(inDegree == -1) {
-			throw new InaccessibleMethodException("The degree option was not set. To access the degrees, " +
-					"call iterationConfiguration.setOptDegrees(true).");
-		}
-		return inDegree;
-	}
-
-	public void setInDegree(Long inDegree) {
-		this.inDegree = inDegree;
-	}
-
-	public Long getOutDegree() throws Exception{
-		if(outDegree == -1) {
-			throw new InaccessibleMethodException("The degree option was not set. To access the degrees, " +
-					"call iterationConfiguration.setOptDegrees(true).");
-		}
-		return outDegree;
-	}
-
-	public void setOutDegree(Long outDegree) {
-		this.outDegree = outDegree;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
index ec43207..365518c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/IncrementalSSSPExample.java
@@ -26,11 +26,10 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.VertexWithDegrees;
 import org.apache.flink.graph.example.utils.IncrementalSSSPData;
-import org.apache.flink.graph.spargel.IterationConfiguration;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexCentricConfiguration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.graph.utils.Tuple2ToVertexMap;
 import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
@@ -95,7 +94,7 @@ public class IncrementalSSSPExample implements ProgramDescription {
 		graph.removeEdge(edgeToBeRemoved);
 
 		// configure the iteration
-		IterationConfiguration parameters = new IterationConfiguration();
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
 
 		if(isInSSSP(edgeToBeRemoved, edgesInSSSP)) {
 
@@ -160,7 +159,7 @@ public class IncrementalSSSPExample implements ProgramDescription {
 		@Override
 		public void updateVertex(Vertex<Long, Double> vertex, MessageIterator<Double> inMessages) throws Exception {
 			if (inMessages.hasNext()) {
-				Long outDegree = ((VertexWithDegrees)vertex).getOutDegree() - 1;
+				Long outDegree = getOutDegree() - 1;
 				// check if the vertex has another SP-Edge
 				if (outDegree > 0) {
 					// there is another shortest path from the source to this vertex

http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java
index a2ba2ac..7b536e5 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponentsAlgorithm.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.library;
 
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
@@ -60,7 +61,7 @@ public class ConnectedComponentsAlgorithm implements GraphAlgorithm<Long, Long,
 	public static final class CCUpdater extends VertexUpdateFunction<Long, Long, Long> {
 
 		@Override
-		public void updateVertex(Long id, Long currentMin, MessageIterator<Long> messages) throws Exception {
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> messages) throws Exception {
 			long min = Long.MAX_VALUE;
 
 			for (long msg : messages) {
@@ -68,7 +69,7 @@ public class ConnectedComponentsAlgorithm implements GraphAlgorithm<Long, Long,
 			}
 
 			// update vertex value, if new minimum
-			if (min < currentMin) {
+			if (min < vertex.getValue()) {
 				setNewVertexValue(min);
 			}
 		}
@@ -80,9 +81,9 @@ public class ConnectedComponentsAlgorithm implements GraphAlgorithm<Long, Long,
 	public static final class CCMessenger extends MessagingFunction<Long, Long, Long, NullValue> {
 
 		@Override
-		public void sendMessages(Long id, Long currentMin) throws Exception {
+		public void sendMessages(Vertex<Long, Long> vertex) throws Exception {
 			// send current minimum to neighbors
-			sendMessageToAllNeighbors(currentMin);
+			sendMessageToAllNeighbors(vertex.getValue());
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
index b3092fd..4245c24 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
@@ -26,24 +26,21 @@ import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.java.tuple.Tuple;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.InaccessibleMethodException;
 import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.VertexWithDegrees;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
 /**
  * The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}.
  * 
- * @param <VertexKey> The type of the vertex key (the vertex identifier).
- * @param <VertexValue> The type of the vertex value (the state of the vertex).
+ * @param <K> The type of the vertex key (the vertex identifier).
+ * @param <VV> The type of the vertex value (the state of the vertex).
  * @param <Message> The type of the message sent between vertices along the edges.
- * @param <EdgeValue> The type of the values that are associated with the edges.
+ * @param <EV> The type of the values that are associated with the edges.
  */
-public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> implements Serializable {
+public abstract class MessagingFunction<K, VV, Message, EV> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
@@ -54,11 +51,12 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
 
 	private long numberOfVertices = -1L;
 
-	public long getNumberOfVertices() throws Exception{
-		if (numberOfVertices == -1) {
-			throw new InaccessibleMethodException("The number of vertices option is not set. " +
-					"To access the number of vertices, call iterationConfiguration.setOptNumVertices(true).");
-		}
+	/**
+	 * Retrieves the number of vertices in the graph.
+	 * @return the number of vertices if the {@link IterationConfiguration#setOptNumVertices(boolean)}
+	 * option has been set; -1 otherwise.
+	 */
+	public long getNumberOfVertices() {
 		return numberOfVertices;
 	}
 
@@ -73,11 +71,15 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
 
 	private EdgeDirection direction;
 
+	/**
+	 * Retrieves the edge direction in which messages are propagated in the vertex-centric iteration.
+	 * @return the messaging {@link EdgeDirection}
+	 */
 	public EdgeDirection getDirection() {
 		return direction;
 	}
 
-	public void setDirection(EdgeDirection direction) {
+	void setDirection(EdgeDirection direction) {
 		this.direction = direction;
 	}
 
@@ -93,7 +95,7 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
 	 * 
 	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
 	 */
-	public abstract void sendMessages(Vertex<VertexKey, VertexValue> vertex) throws Exception;
+	public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception;
 	
 	/**
 	 * This method is executed one per superstep before the vertex update function is invoked for each vertex.
@@ -117,12 +119,12 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
 	 * @return An iterator with all outgoing edges.
 	 */
 	@SuppressWarnings("unchecked")
-	public Iterable<Edge<VertexKey, EdgeValue>> getEdges() {
+	public Iterable<Edge<K, EV>> getEdges() {
 		if (edgesUsed) {
 			throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once.");
 		}
 		edgesUsed = true;
-		this.edgeIterator.set((Iterator<Edge<VertexKey, EdgeValue>>) edges);
+		this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
 		return this.edgeIterator;
 	}
 
@@ -143,7 +145,7 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
 		
 		while (edges.hasNext()) {
 			Tuple next = (Tuple) edges.next();
-			VertexKey k = next.getField(1);
+			K k = next.getField(1);
 			outValue.f0 = k;
 			out.collect(outValue);
 		}
@@ -156,7 +158,7 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
 	 * @param target The key (id) of the target vertex to message.
 	 * @param m The message.
 	 */
-	public void sendMessageTo(VertexKey target, Message m) {
+	public void sendMessageTo(K target, Message m) {
 		outValue.f0 = target;
 		outValue.f1 = m;
 		out.collect(outValue);
@@ -210,39 +212,42 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
 	//  internal methods and state
 	// --------------------------------------------------------------------------------------------
 	
-	private Tuple2<VertexKey, Message> outValue;
+	private Tuple2<K, Message> outValue;
 	
 	private IterationRuntimeContext runtimeContext;
 	
 	private Iterator<?> edges;
 	
-	private Collector<Tuple2<VertexKey, Message>> out;
+	private Collector<Tuple2<K, Message>> out;
 	
-	private EdgesIterator<VertexKey, EdgeValue> edgeIterator;
+	private EdgesIterator<K, EV> edgeIterator;
 	
 	private boolean edgesUsed;
-	
+
+	private long inDegree = -1;
+
+	private long outDegree = -1;
 	
 	void init(IterationRuntimeContext context) {
 		this.runtimeContext = context;
-		this.outValue = new Tuple2<VertexKey, Message>();
-		this.edgeIterator = new EdgesIterator<VertexKey, EdgeValue>();
+		this.outValue = new Tuple2<K, Message>();
+		this.edgeIterator = new EdgesIterator<K, EV>();
 	}
 	
-	void set(Iterator<?> edges, Collector<Tuple2<VertexKey, Message>> out) {
+	void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out) {
 		this.edges = edges;
 		this.out = out;
 		this.edgesUsed = false;
 	}
 	
-	private static final class EdgesIterator<VertexKey, EdgeValue> 
-		implements Iterator<Edge<VertexKey, EdgeValue>>, Iterable<Edge<VertexKey, EdgeValue>>
+	private static final class EdgesIterator<K, EV> 
+		implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>>
 	{
-		private Iterator<Edge<VertexKey, EdgeValue>> input;
+		private Iterator<Edge<K, EV>> input;
 		
-		private Edge<VertexKey, EdgeValue> edge = new Edge<VertexKey, EdgeValue>();
+		private Edge<K, EV> edge = new Edge<K, EV>();
 		
-		void set(Iterator<Edge<VertexKey, EdgeValue>> input) {
+		void set(Iterator<Edge<K, EV>> input) {
 			this.input = input;
 		}
 		
@@ -252,8 +257,8 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
 		}
 
 		@Override
-		public Edge<VertexKey, EdgeValue> next() {
-			Edge<VertexKey, EdgeValue> next = input.next();
+		public Edge<K, EV> next() {
+			Edge<K, EV> next = input.next();
 			edge.setSource(next.f0);
 			edge.setTarget(next.f1);
 			edge.setValue(next.f2);
@@ -265,28 +270,34 @@ public abstract class MessagingFunction<VertexKey, VertexValue, Message, EdgeVal
 			throw new UnsupportedOperationException();
 		}
 		@Override
-		public Iterator<Edge<VertexKey, EdgeValue>> iterator() {
+		public Iterator<Edge<K, EV>> iterator() {
 			return this;
 		}
 	}
 
 	/**
-	 * In order to hide the Tuple3(actualValue, inDegree, outDegree) vertex value from the user,
-	 * another function will be called from {@link org.apache.flink.graph.spargel.VertexCentricIteration}.
-	 *
-	 * This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling
-	 * the regular sendMessages function.
-	 *
-	 * @param newVertexState
-	 * @throws Exception
+	 * Retrieves the vertex in-degree (number of in-coming edges).
+	 * @return The in-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
+	 * option has been set; -1 otherwise. 
 	 */
-	void sendMessagesFromVertexCentricIteration(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> newVertexState)
-			throws Exception {
-		VertexWithDegrees<VertexKey, VertexValue> vertex = new VertexWithDegrees<VertexKey, VertexValue>(newVertexState.getId(),
-				newVertexState.getValue().f0);
-		vertex.setInDegree(newVertexState.getValue().f1);
-		vertex.setOutDegree(newVertexState.getValue().f2);
+	public long getInDegree() {
+		return inDegree;
+	}
+
+	void setInDegree(long inDegree) {
+		this.inDegree = inDegree;
+	}
+
+	/**
+	 * Retrieve the vertex out-degree (number of out-going edges).
+	 * @return The out-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
+	 * option has been set; -1 otherwise. 
+	 */
+	public long getOutDegree() {
+		return outDegree;
+	}
 
-		sendMessages(vertex);
+	void setOutDegree(long outDegree) {
+		this.outDegree = outDegree;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
index 915a3ee..f8e64b6 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -29,10 +29,8 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.CoGroupOperator;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
-import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -43,7 +41,6 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.VertexWithDegrees;
 import org.apache.flink.util.Collector;
 
 import com.google.common.base.Preconditions;
@@ -72,34 +69,33 @@ import com.google.common.base.Preconditions;
  * Vertex-centric graph iterations are are run by calling
  * {@link Graph#runVertexCentricIteration(VertexUpdateFunction, MessagingFunction, int)}.
  *
- * @param <VertexKey> The type of the vertex key (the vertex identifier).
- * @param <VertexValue> The type of the vertex value (the state of the vertex).
+ * @param <K> The type of the vertex key (the vertex identifier).
+ * @param <VV> The type of the vertex value (the state of the vertex).
  * @param <Message> The type of the message sent between vertices along the edges.
- * @param <EdgeValue> The type of the values that are associated with the edges.
+ * @param <EV> The type of the values that are associated with the edges.
  */
-public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue> 
-	implements CustomUnaryOperation<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>>
+public class VertexCentricIteration<K, VV, Message, EV> 
+	implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>>
 {
-	private final VertexUpdateFunction<VertexKey, VertexValue, Message> updateFunction;
+	private final VertexUpdateFunction<K, VV, Message> updateFunction;
 
-	private final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
+	private final MessagingFunction<K, VV, Message, EV> messagingFunction;
 	
-	private final DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue;
+	private final DataSet<Edge<K, EV>> edgesWithValue;
 	
 	private final int maximumNumberOfIterations;
 	
 	private final TypeInformation<Message> messageType;
 	
-	private DataSet<Vertex<VertexKey, VertexValue>> initialVertices;
+	private DataSet<Vertex<K, VV>> initialVertices;
 
 	private VertexCentricConfiguration configuration;
 
-	private DataSet<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> verticesWithDegrees;
 	// ----------------------------------------------------------------------------------
 	
-	private VertexCentricIteration(VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
-			MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
-			DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue, 
+	private VertexCentricIteration(VertexUpdateFunction<K, VV, Message> uf,
+			MessagingFunction<K, VV, Message, EV> mf,
+			DataSet<Edge<K, EV>> edgesWithValue, 
 			int maximumNumberOfIterations)
 	{
 		Preconditions.checkNotNull(uf);
@@ -114,7 +110,7 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 		this.messageType = getMessageType(mf);
 	}
 	
-	private TypeInformation<Message> getMessageType(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf) {
+	private TypeInformation<Message> getMessageType(MessagingFunction<K, VV, Message, EV> mf) {
 		return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), 2, null, null);
 	}
 	
@@ -132,7 +128,7 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 	 * @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
 	 */
 	@Override
-	public void setInput(DataSet<Vertex<VertexKey, VertexValue>> inputData) {
+	public void setInput(DataSet<Vertex<K, VV>> inputData) {
 		this.initialVertices = inputData;
 	}
 	
@@ -142,17 +138,17 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 	 * @return The operator that represents this vertex-centric graph computation.
 	 */
 	@Override
-	public DataSet<Vertex<VertexKey, VertexValue>> createResult() {
+	public DataSet<Vertex<K, VV>> createResult() {
 		if (this.initialVertices == null) {
 			throw new IllegalStateException("The input data set has not been set.");
 		}
 
 		// prepare some type information
-		TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
-		TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType);
+		TypeInformation<K> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
+		TypeInformation<Tuple2<K, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<K,Message>>(keyType, messageType);
 
 		// create a graph
-		Graph<VertexKey, VertexValue, EdgeValue> graph =
+		Graph<K, VV, EV> graph =
 				Graph.fromDataSet(initialVertices, edgesWithValue, ExecutionEnvironment.getExecutionEnvironment());
 
 		// check whether the numVertices option is set and, if so, compute the total number of vertices
@@ -194,21 +190,21 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 	 * @param uf The function that updates the state of the vertices from the incoming messages.
 	 * @param mf The function that turns changed vertex states into messages along the edges.
 	 * 
-	 * @param <VertexKey> The type of the vertex key (the vertex identifier).
-	 * @param <VertexValue> The type of the vertex value (the state of the vertex).
+	 * @param <K> The type of the vertex key (the vertex identifier).
+	 * @param <VV> The type of the vertex value (the state of the vertex).
 	 * @param <Message> The type of the message sent between vertices along the edges.
-	 * @param <EdgeValue> The type of the values that are associated with the edges.
+	 * @param <EV> The type of the values that are associated with the edges.
 	 * 
 	 * @return An in stance of the vertex-centric graph computation operator.
 	 */
-	public static final <VertexKey, VertexValue, Message, EdgeValue>
-			VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue> withEdges(
-					DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue,
-					VertexUpdateFunction<VertexKey, VertexValue, Message> uf,
-					MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> mf,
+	public static final <K, VV, Message, EV>
+			VertexCentricIteration<K, VV, Message, EV> withEdges(
+					DataSet<Edge<K, EV>> edgesWithValue,
+					VertexUpdateFunction<K, VV, Message> uf,
+					MessagingFunction<K, VV, Message, EV> mf,
 					int maximumNumberOfIterations)
 	{
-		return new VertexCentricIteration<VertexKey, VertexValue, Message, EdgeValue>(uf, mf, edgesWithValue, maximumNumberOfIterations);
+		return new VertexCentricIteration<K, VV, Message, EV>(uf, mf, edgesWithValue, maximumNumberOfIterations);
 	}
 
 	/**
@@ -231,21 +227,21 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 	//  Wrapping UDFs
 	// --------------------------------------------------------------------------------------------
 
-	private static final class VertexUpdateUdf<VertexKey, VertexValue, Message> 
-		extends RichCoGroupFunction<Tuple2<VertexKey, Message>, Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>>
-		implements ResultTypeQueryable<Vertex<VertexKey, VertexValue>>
+	private static abstract class VertexUpdateUdf<K, VVWithDegrees, Message> extends RichCoGroupFunction<
+		Tuple2<K, Message>, Vertex<K, VVWithDegrees>, Vertex<K, VVWithDegrees>>
+		implements ResultTypeQueryable<Vertex<K, VVWithDegrees>>
 	{
 		private static final long serialVersionUID = 1L;
 		
-		final VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction;
+		final VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction;
 
 		final MessageIterator<Message> messageIter = new MessageIterator<Message>();
 		
-		private transient TypeInformation<Vertex<VertexKey, VV>> resultType;
+		private transient TypeInformation<Vertex<K, VVWithDegrees>> resultType;
 		
 		
-		private VertexUpdateUdf(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction,
-				TypeInformation<Vertex<VertexKey, VV>> resultType)
+		private VertexUpdateUdf(VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction,
+				TypeInformation<Vertex<K, VVWithDegrees>> resultType)
 		{
 			this.vertexUpdateFunction = vertexUpdateFunction;
 			this.resultType = resultType;
@@ -265,27 +261,26 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 		}
 
 		@Override
-		public TypeInformation<Vertex<VertexKey, VV>> getProducedType() {
+		public TypeInformation<Vertex<K, VVWithDegrees>> getProducedType() {
 			return this.resultType;
 		}
 	}
 
-	private static final class VertexUpdateUdfSimpleVertexValue<VertexKey, VertexValue, Message>
-		extends VertexUpdateUdf<VertexKey, VertexValue, Message> {
+	@SuppressWarnings("serial")
+	private static final class VertexUpdateUdfSimpleVV<K, VV, Message> extends VertexUpdateUdf<K, VV, Message> {
 
-
-		private VertexUpdateUdfSimpleVertexValue(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, TypeInformation<Vertex<VertexKey, VertexValue>> resultType) {
+		private VertexUpdateUdfSimpleVV(VertexUpdateFunction<K, VV, Message> vertexUpdateFunction, TypeInformation<Vertex<K, VV>> resultType) {
 			super(vertexUpdateFunction, resultType);
 		}
 
 		@Override
-		public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages,
-							Iterable<Vertex<VertexKey, VertexValue>> vertex,
-							Collector<Vertex<VertexKey, VertexValue>> out) throws Exception {
-			final Iterator<Vertex<VertexKey, VertexValue>> vertexIter = vertex.iterator();
+		public void coGroup(Iterable<Tuple2<K, Message>> messages,
+							Iterable<Vertex<K, VV>> vertex,
+							Collector<Vertex<K, VV>> out) throws Exception {
+			final Iterator<Vertex<K, VV>> vertexIter = vertex.iterator();
 
 			if (vertexIter.hasNext()) {
-				Vertex<VertexKey, VertexValue> vertexState = vertexIter.next();
+				Vertex<K, VV> vertexState = vertexIter.next();
 
 				@SuppressWarnings("unchecked")
 				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
@@ -295,11 +290,11 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 				vertexUpdateFunction.updateVertex(vertexState, messageIter);
 			}
 			else {
-				final Iterator<Tuple2<VertexKey, Message>> messageIter = messages.iterator();
+				final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
 				if (messageIter.hasNext()) {
 					String message = "Target vertex does not exist!.";
 					try {
-						Tuple2<VertexKey, Message> next = messageIter.next();
+						Tuple2<K, Message> next = messageIter.next();
 						message = "Target vertex '" + next.f0 + "' does not exist!.";
 					} catch (Throwable t) {}
 					throw new Exception(message);
@@ -310,36 +305,39 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 		}
 	}
 
-	private static final class VertexUpdateUdfVertexValueWithDegrees<VertexKey,	VertexValue, Message> extends VertexUpdateUdf<VertexKey,
-			Tuple3<VertexValue, Long, Long>, VertexValue, Message> {
-
+	@SuppressWarnings("serial")
+	private static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> extends VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> {
 
-		private VertexUpdateUdfVertexValueWithDegrees(VertexUpdateFunction<VertexKey, VertexValue, Message> vertexUpdateFunction, TypeInformation<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> resultType) {
+		private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, Tuple3<VV, Long, Long>, Message> vertexUpdateFunction,
+				TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> resultType) {
 			super(vertexUpdateFunction, resultType);
 		}
-
+		
 		@Override
-		public void coGroup(Iterable<Tuple2<VertexKey, Message>> messages,
-							Iterable<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> vertex,
-							Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> out) throws Exception {
-			final Iterator<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> vertexIter = vertex.iterator();
+		public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> vertex,
+							Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
 
+			final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> vertexIter = vertex.iterator();
+		
 			if (vertexIter.hasNext()) {
-				Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> vertexState = vertexIter.next();
-
+				Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = vertexIter.next();
+		
 				@SuppressWarnings("unchecked")
 				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
 				messageIter.setSource(downcastIter);
 
-				vertexUpdateFunction.setOutputWithDegrees(vertexState, out);
-				vertexUpdateFunction.updateVertexFromVertexCentricIteration(vertexState, messageIter);
+				vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1);
+				vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2);
+
+				vertexUpdateFunction.setOutputWithDegrees(vertexWithDegrees, out);
+				vertexUpdateFunction.updateVertexFromVertexCentricIteration(vertexWithDegrees, messageIter);
 			}
 			else {
-				final Iterator<Tuple2<VertexKey, Message>> messageIter = messages.iterator();
+				final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
 				if (messageIter.hasNext()) {
 					String message = "Target vertex does not exist!.";
 					try {
-						Tuple2<VertexKey, Message> next = messageIter.next();
+						Tuple2<K, Message> next = messageIter.next();
 						message = "Target vertex '" + next.f0 + "' does not exist!.";
 					} catch (Throwable t) {}
 					throw new Exception(message);
@@ -353,19 +351,19 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 	/*
 	 * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
 	 */
-	private static final class MessagingUdfWithEdgeValues<VertexKey, VertexValue, Message, EdgeValue> 
-		extends RichCoGroupFunction<Edge<VertexKey, EdgeValue>, Vertex<VertexKey, VertexValue>, Tuple2<VertexKey, Message>>
-		implements ResultTypeQueryable<Tuple2<VertexKey, Message>>
+	private static abstract class MessagingUdfWithEdgeValues<K, VVWithDegrees, VV, Message, EV>
+		extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, VVWithDegrees>, Tuple2<K, Message>>
+		implements ResultTypeQueryable<Tuple2<K, Message>>
 	{
 		private static final long serialVersionUID = 1L;
 		
-		final MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction;
+		final MessagingFunction<K, VV, Message, EV> messagingFunction;
 		
-		private transient TypeInformation<Tuple2<VertexKey, Message>> resultType;
-
-
-		private MessagingUdfWithEdgeValues(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
-				TypeInformation<Tuple2<VertexKey, Message>> resultType)
+		private transient TypeInformation<Tuple2<K, Message>> resultType;
+	
+	
+		private MessagingUdfWithEdgeValues(MessagingFunction<K, VV, Message, EV> messagingFunction,
+				TypeInformation<Tuple2<K, Message>> resultType)
 		{
 			this.messagingFunction = messagingFunction;
 			this.resultType = resultType;
@@ -386,54 +384,62 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 		}
 		
 		@Override
-		public TypeInformation<Tuple2<VertexKey, Message>> getProducedType() {
+		public TypeInformation<Tuple2<K, Message>> getProducedType() {
 			return this.resultType;
 		}
 	}
 
-	private static final class MessagingUdfWithEdgeValuesSimpleVertexValue<VertexKey, VertexValue, Message, EdgeValue>
-			extends MessagingUdfWithEdgeValues<VertexKey, VertexValue, VertexValue, Message, EdgeValue> {
+	@SuppressWarnings("serial")
+	private static final class MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>
+		extends MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> {
 
-		private MessagingUdfWithEdgeValuesSimpleVertexValue(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
-															TypeInformation<Tuple2<VertexKey, Message>> resultType) {
+		private MessagingUdfWithEVsSimpleVV(MessagingFunction<K, VV, Message, EV> messagingFunction,
+			TypeInformation<Tuple2<K, Message>> resultType) {
 			super(messagingFunction, resultType);
 		}
 
 		@Override
-		public void coGroup(Iterable<Edge<VertexKey, EdgeValue>> edges,
-							Iterable<Vertex<VertexKey, VertexValue>> state,
-							Collector<Tuple2<VertexKey, Message>> out) throws Exception {
-			final Iterator<Vertex<VertexKey, VertexValue>> stateIter = state.iterator();
-
+		public void coGroup(Iterable<Edge<K, EV>> edges,
+							Iterable<Vertex<K, VV>> state,
+							Collector<Tuple2<K, Message>> out) throws Exception {
+			final Iterator<Vertex<K, VV>> stateIter = state.iterator();
+		
 			if (stateIter.hasNext()) {
-				Vertex<VertexKey, VertexValue> newVertexState = stateIter.next();
+				Vertex<K, VV> newVertexState = stateIter.next();
 				messagingFunction.set((Iterator<?>) edges.iterator(), out);
 				messagingFunction.sendMessages(newVertexState);
 			}
 		}
 	}
 
-	private static final class MessagingUdfWithEdgeValuesVertexValueWithDegrees<VertexKey, VertexValue, Message, EdgeValue>
-			extends MessagingUdfWithEdgeValues<VertexKey, Tuple3<VertexValue, Long, Long>, VertexValue, Message, EdgeValue> {
+	@SuppressWarnings("serial")
+	private static final class MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>
+		extends MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> {
 
+		private Vertex<K, VV> nextVertex = new Vertex<K, VV>();
 
-		private MessagingUdfWithEdgeValuesVertexValueWithDegrees
-				(MessagingFunction<VertexKey, VertexValue, Message, EdgeValue> messagingFunction,
-				TypeInformation<Tuple2<VertexKey, Message>> resultType) {
+		private MessagingUdfWithEVsVVWithDegrees(MessagingFunction<K, VV, Message, EV> messagingFunction,
+				TypeInformation<Tuple2<K, Message>> resultType) {
 			super(messagingFunction, resultType);
 		}
 
 		@Override
-		public void coGroup(Iterable<Edge<VertexKey, EdgeValue>> edges,
-							Iterable<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> state,
-							Collector<Tuple2<VertexKey, Message>> out) throws Exception {
-
-			final Iterator<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> stateIter = state.iterator();
+		public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> state,
+				Collector<Tuple2<K, Message>> out) throws Exception {
 
+			final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> stateIter = state.iterator();
+		
 			if (stateIter.hasNext()) {
-				Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> newVertexState = stateIter.next();
+				Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = stateIter.next();
+
+				nextVertex.setField(vertexWithDegrees.f0, 0);
+				nextVertex.setField(vertexWithDegrees.f1.f0, 1);
+
+				messagingFunction.setInDegree(vertexWithDegrees.f1.f1);
+				messagingFunction.setOutDegree(vertexWithDegrees.f1.f2);
+
 				messagingFunction.set((Iterator<?>) edges.iterator(), out);
-				messagingFunction.sendMessagesFromVertexCentricIteration(newVertexState);
+				messagingFunction.sendMessages(nextVertex);
 			}
 		}
 	}
@@ -454,15 +460,14 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 	 * @param equalToArg the argument for the equalTo within the coGroup
 	 * @return the messaging function
 	 */
-	private CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> buildMessagingFunction(
-			DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> iteration,
-			TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo, int whereArg, int equalToArg) {
+	private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunction(
+			DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration,
+			TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
 
 		// build the messaging function (co group)
-		CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
-		MessagingUdfWithEdgeValues<VertexKey, VertexValue, VertexValue, Message, EdgeValue> messenger =
-				new MessagingUdfWithEdgeValuesSimpleVertexValue<VertexKey, VertexValue, Message, EdgeValue>(
-						messagingFunction, messageTypeInfo);
+		CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
+		MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> messenger =
+				new MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
 
 		messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
 				.equalTo(equalToArg).with(messenger);
@@ -489,16 +494,14 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 	 * @param equalToArg the argument for the equalTo within the coGroup
 	 * @return the messaging function
 	 */
-	private CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> buildMessagingFunctionVerticesWithDegrees(
-			DeltaIteration<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>,
-					Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> iteration,
-			TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo, int whereArg, int equalToArg) {
+	private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunctionVerticesWithDegrees(
+			DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration,
+			TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
 
 		// build the messaging function (co group)
-		CoGroupOperator<?, ?, Tuple2<VertexKey, Message>> messages;
-		MessagingUdfWithEdgeValues<VertexKey, Tuple3<VertexValue, Long, Long>, VertexValue, Message, EdgeValue> messenger =
-				new MessagingUdfWithEdgeValuesVertexValueWithDegrees<VertexKey, VertexValue, Message, EdgeValue>(
-						messagingFunction, messageTypeInfo);
+		CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
+		MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> messenger =
+				new MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
 
 		messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
 				.equalTo(equalToArg).with(messenger);
@@ -518,17 +521,10 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 	/**
 	 * Helper method which sets up an iteration with the given vertex value(either simple or with degrees)
 	 *
-	 * @param vertices
-	 * @param <VV>
+	 * @param iteration
 	 */
 
-	private <VV> DeltaIteration<Vertex<VertexKey, VV>, Vertex<VertexKey, VV>> setUpIteration(
-			DataSet<Vertex<VertexKey, VV>> vertices) {
-
-		final int[] zeroKeyPos = new int[] {0};
-
-		final DeltaIteration<Vertex<VertexKey, VV>, Vertex<VertexKey, VV>> iteration =
-				vertices.iterateDelta(vertices, this.maximumNumberOfIterations, zeroKeyPos);
+	private void setUpIteration(DeltaIteration<?, ?> iteration) {
 
 		// set up the iteration operator
 		if (this.configuration != null) {
@@ -546,8 +542,6 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 			// no configuration provided; set default name
 			iteration.name("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")");
 		}
-
-		return iteration;
 	}
 
 	/**
@@ -557,14 +551,16 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 	 * @param messageTypeInfo
 	 * @return the operator
 	 */
-	private DataSet<Vertex<VertexKey, VertexValue>> createResultSimpleVertex(EdgeDirection messagingDirection,
-																			TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo) {
-		DataSet<Tuple2<VertexKey, Message>> messages;
+	private DataSet<Vertex<K, VV>> createResultSimpleVertex(EdgeDirection messagingDirection,
+		TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
+
+		DataSet<Tuple2<K, Message>> messages;
 
-		TypeInformation<Vertex<VertexKey, VertexValue>> vertexTypes = initialVertices.getType();
+		TypeInformation<Vertex<K, VV>> vertexTypes = initialVertices.getType();
 
-		final DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> iteration =
-				setUpIteration(this.initialVertices);
+		final DeltaIteration<Vertex<K, VV>,	Vertex<K, VV>> iteration =
+				initialVertices.iterateDelta(initialVertices, this.maximumNumberOfIterations, 0);
+				setUpIteration(iteration);
 
 		switch (messagingDirection) {
 			case IN:
@@ -581,11 +577,11 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 				throw new IllegalArgumentException("Illegal edge direction");
 		}
 
-		VertexUpdateUdf<VertexKey, VertexValue, VertexValue, Message> updateUdf =
-				new VertexUpdateUdfSimpleVertexValue<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
+		VertexUpdateUdf<K, VV, Message> updateUdf =
+				new VertexUpdateUdfSimpleVV<K, VV, Message>(updateFunction, vertexTypes);
 
 		// build the update function (co group)
-		CoGroupOperator<?, ?, Vertex<VertexKey, VertexValue>> updates =
+		CoGroupOperator<?, ?, Vertex<K, VV>> updates =
 				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
 
 		configureUpdateFunction(updates);
@@ -602,46 +598,44 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 	 * @param messageTypeInfo
 	 * @return the operator
 	 */
-	private DataSet<Vertex<VertexKey, VertexValue>> createResultVerticesWithDegrees(
-			Graph<VertexKey, VertexValue, EdgeValue> graph,
-			EdgeDirection messagingDirection,
-			TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo) {
+	@SuppressWarnings("serial")
+	private DataSet<Vertex<K, VV>> createResultVerticesWithDegrees(Graph<K, VV, EV> graph, EdgeDirection messagingDirection,
+			TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
 
-		DataSet<Tuple2<VertexKey, Message>> messages;
+		DataSet<Tuple2<K, Message>> messages;
 
 		this.updateFunction.setOptDegrees(this.configuration.isOptDegrees());
 
-		DataSet<Tuple2<VertexKey, Long>> inDegrees = graph.inDegrees();
-		DataSet<Tuple2<VertexKey, Long>> outDegrees = graph.outDegrees();
+		DataSet<Tuple2<K, Long>> inDegrees = graph.inDegrees();
+		DataSet<Tuple2<K, Long>> outDegrees = graph.outDegrees();
 
-		DataSet<Tuple3<VertexKey, Long, Long>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
-				.with(new FlatJoinFunction<Tuple2<VertexKey, Long>, Tuple2<VertexKey, Long>, Tuple3<VertexKey, Long, Long>>() {
+		DataSet<Tuple3<K, Long, Long>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
+				.with(new FlatJoinFunction<Tuple2<K, Long>, Tuple2<K, Long>, Tuple3<K, Long, Long>>() {
 
 					@Override
-					public void join(Tuple2<VertexKey, Long> first, Tuple2<VertexKey, Long> second, Collector<Tuple3<VertexKey, Long, Long>> out) throws Exception {
-						out.collect(new Tuple3<VertexKey, Long, Long>(first.f0, first.f1, second.f1));
+					public void join(Tuple2<K, Long> first, Tuple2<K, Long> second,	Collector<Tuple3<K, Long, Long>> out) {
+						out.collect(new Tuple3<K, Long, Long>(first.f0, first.f1, second.f1));
 					}
-				});
+				}).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1");
 
-		DataSet<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> verticesWithDegrees= initialVertices
+		DataSet<Vertex<K, Tuple3<VV, Long, Long>>> verticesWithDegrees = initialVertices
 				.join(degrees).where(0).equalTo(0)
-				.with(new FlatJoinFunction<Vertex<VertexKey,VertexValue>, Tuple3<VertexKey,Long,Long>, Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>>() {
+				.with(new FlatJoinFunction<Vertex<K,VV>, Tuple3<K,Long,Long>, Vertex<K, Tuple3<VV, Long, Long>>>() {
 					@Override
-					public void join(Vertex<VertexKey, VertexValue> vertex,
-									Tuple3<VertexKey, Long, Long> degrees,
-									Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> out) throws Exception {
+					public void join(Vertex<K, VV> vertex, Tuple3<K, Long, Long> degrees,
+									Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
 
-						out.collect(new VertexWithDegrees<VertexKey, Tuple3<VertexValue, Long, Long>>(vertex.getId(),
-								new Tuple3<VertexValue, Long, Long>(vertex.getValue(), degrees.f1, degrees.f2)));
+						out.collect(new Vertex<K, Tuple3<VV, Long, Long>>(vertex.getId(),
+								new Tuple3<VV, Long, Long>(vertex.getValue(), degrees.f1, degrees.f2)));
 					}
-				});
+				}).withForwardedFieldsFirst("f0");
 
 		// add type info
-		TypeInformation<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> vertexTypes = verticesWithDegrees.getType();
+		TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> vertexTypes = verticesWithDegrees.getType();
 
-		final DeltaIteration<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>,
-				Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> iteration =
-				setUpIteration(verticesWithDegrees);
+		final DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>,	Vertex<K, Tuple3<VV, Long, Long>>> iteration =
+				verticesWithDegrees.iterateDelta(verticesWithDegrees, this.maximumNumberOfIterations, 0);
+				setUpIteration(iteration);
 
 		switch (messagingDirection) {
 			case IN:
@@ -658,24 +652,26 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 				throw new IllegalArgumentException("Illegal edge direction");
 		}
 
-		VertexUpdateUdf<VertexKey, Tuple3<VertexValue, Long, Long>, VertexValue, Message> updateUdf =
-				new VertexUpdateUdfVertexValueWithDegrees<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
+		@SuppressWarnings({ "unchecked", "rawtypes" })
+		VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> updateUdf =
+				new VertexUpdateUdfVVWithDegrees(updateFunction, vertexTypes);
 
 		// build the update function (co group)
-		CoGroupOperator<?, ?, Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> updates =
+		CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, Long, Long>>> updates =
 				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
 
 		configureUpdateFunction(updates);
 
-		return iteration.closeWith(updates, updates).map(new MapFunction<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>, Vertex<VertexKey, VertexValue>>() {
-			@Override
-			public Vertex<VertexKey, VertexValue> map(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> vertex) throws Exception {
-				return new Vertex<VertexKey, VertexValue>(vertex.getId(), vertex.getValue().f0);
-			}
-		});
+		return iteration.closeWith(updates, updates).map(
+				new MapFunction<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, VV>>() {
+
+					public Vertex<K, VV> map(Vertex<K, Tuple3<VV, Long, Long>> vertex) {
+						return new Vertex<K, VV>(vertex.getId(), vertex.getValue().f0);
+					}
+				});
 	}
 
-	private <VV> void configureUpdateFunction(CoGroupOperator<?, ?, Vertex<VertexKey, VV>> updates) {
+	private <VVWithDegree> void configureUpdateFunction(CoGroupOperator<?, ?, Vertex<K, VVWithDegree>> updates) {
 
 		// configure coGroup update function with name and broadcast variables
 		updates = updates.name("Vertex State Updates");
@@ -688,4 +684,4 @@ public class VertexCentricIteration<VertexKey, VertexValue,	Message, EdgeValue>
 		// let the operator know that we preserve the key field
 		updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
index bc2e857..9930b50 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -24,9 +24,7 @@ import java.util.Collection;
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.functions.IterationRuntimeContext;
 import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.graph.InaccessibleMethodException;
 import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.VertexWithDegrees;
 import org.apache.flink.types.Value;
 import org.apache.flink.util.Collector;
 
@@ -35,11 +33,11 @@ import org.apache.flink.util.Collector;
  * incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)}, which is
  * invoked once per vertex per superstep.
  * 
- * <VertexKey> The vertex key type.
- * <VertexValue> The vertex value type.
+ * <K> The vertex key type.
+ * <VV> The vertex value type.
  * <Message> The message type.
  */
-public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> implements Serializable {
+public abstract class VertexUpdateFunction<K, VV, Message> implements Serializable {
 
 	private static final long serialVersionUID = 1L;
 
@@ -50,11 +48,12 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
 
 	private long numberOfVertices = -1L;
 
-	public long getNumberOfVertices() throws Exception{
-		if (numberOfVertices == -1) {
-			throw new InaccessibleMethodException("The number of vertices option is not set. " +
-					"To access the number of vertices, call iterationConfiguration.setOptNumVertices(true).");
-		}
+	/**
+	 * Retrieves the number of vertices in the graph.
+	 * @return the number of vertices if the {@link IterationConfiguration#setOptNumVertices(boolean)}
+	 * option has been set; -1 otherwise.
+	 */
+	public long getNumberOfVertices() {
 		return numberOfVertices;
 	}
 
@@ -66,7 +65,7 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
 
 	private boolean optDegrees;
 
-	public boolean isOptDegrees() {
+	boolean isOptDegrees() {
 		return optDegrees;
 	}
 
@@ -80,7 +79,7 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
 	
 	/**
 	 * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
-	 * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
+	 * the incoming messages. It may set a new vertex state via {@link #setNewVV(Object)}. If the vertex
 	 * state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
 	 * 
 	 * @param vertex The vertex.
@@ -88,7 +87,7 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
 	 * 
 	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
 	 */
-	public abstract void updateVertex(Vertex<VertexKey, VertexValue> vertex, MessageIterator<Message> inMessages) throws Exception;
+	public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<Message> inMessages) throws Exception;
 	
 	/**
 	 * This method is executed one per superstep before the vertex update function is invoked for each vertex.
@@ -109,7 +108,7 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
 	 * 
 	 * @param newValue The new vertex value.
 	 */
-	public void setNewVertexValue(VertexValue newValue) {
+	public void setNewVertexValue(VV newValue) {
 		if(isOptDegrees()) {
 			outValWithDegrees.f1.f0 = newValue;
 			outWithDegrees.collect(outValWithDegrees);
@@ -167,30 +166,58 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
 	
 	private IterationRuntimeContext runtimeContext;
 
-	private Collector<Vertex<VertexKey, VertexValue>> out;
+	private Collector<Vertex<K, VV>> out;
 	
-	private Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> outWithDegrees;
+	private Collector<Vertex<K, Tuple3<VV, Long, Long>>> outWithDegrees;
 
-	private Vertex<VertexKey, VertexValue> outVal;
+	private Vertex<K, VV> outVal;
 
-	private Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> outValWithDegrees;
+	private Vertex<K, Tuple3<VV, Long, Long>> outValWithDegrees;
 
+	private long inDegree = -1;
+
+	private long outDegree = -1;
 
 	void init(IterationRuntimeContext context) {
 		this.runtimeContext = context;
 	}
 
+	void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) {
+		this.outVal = outVal;
+		this.out = out;
+	}
 
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	<ValueWithDegree> void setOutputWithDegrees(Vertex<K, ValueWithDegree> outVal,
+			Collector out) {
+		this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) outVal;
+		this.outWithDegrees = out;
+	}
 
-	void setOutputWithDegrees(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> outValWithDegrees,
-							Collector<Vertex<VertexKey, Tuple3<VertexValue, Long, Long>>> outWithDegrees) {
-		this.outValWithDegrees = outValWithDegrees;
-		this.outWithDegrees = outWithDegrees;
+	/**
+	 * Retrieves the vertex in-degree (number of in-coming edges).
+	 * @return The in-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
+	 * option has been set; -1 otherwise. 
+	 */
+	public long getInDegree() {
+		return inDegree;
 	}
 
-	void setOutput(Vertex<VertexKey, VertexValue> outVal, Collector<Vertex<VertexKey, VertexValue>> out) {
-		this.outVal = outVal;
-		this.out = out;
+	void setInDegree(long inDegree) {
+		this.inDegree = inDegree;
+	}
+
+	/**
+	 * Retrieve the vertex out-degree (number of out-going edges).
+	 * @return The out-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
+	 * option has been set; -1 otherwise. 
+	 */
+	public long getOutDegree() {
+		return outDegree;
+	}
+
+	void setOutDegree(long outDegree) {
+		this.outDegree = outDegree;
 	}
 
 	/**
@@ -204,12 +231,12 @@ public abstract class VertexUpdateFunction<VertexKey, VertexValue, Message> impl
 	 * @param inMessages
 	 * @throws Exception
 	 */
-	void updateVertexFromVertexCentricIteration(Vertex<VertexKey, Tuple3<VertexValue, Long, Long>> vertexState,
+	@SuppressWarnings("unchecked")
+	<VertexWithDegree> void updateVertexFromVertexCentricIteration(Vertex<K, VertexWithDegree> vertexState,
 												MessageIterator<Message> inMessages) throws Exception {
-		VertexWithDegrees<VertexKey, VertexValue> vertex = new VertexWithDegrees<VertexKey, VertexValue>(vertexState.getId(),
-				vertexState.getValue().f0);
-		vertex.setInDegree(vertexState.getValue().f1);
-		vertex.setOutDegree(vertexState.getValue().f2);
+
+		Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0,
+				((Tuple3<VV, Long, Long>)vertexState.getValue()).f0);
 
 		updateVertex(vertex, inMessages);
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
index e93f581..3fbd0bc 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.io.DiscardingOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.MessageIterator;

http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
index 5f5f8b2..ca5d5d9 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/GatherSumApplyConfigurationITCase.java
@@ -31,7 +31,6 @@ import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.GatherSumApplyIteration;
 import org.apache.flink.graph.gsa.Neighbor;
 import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.IterationConfiguration;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
 import org.apache.flink.types.LongValue;
 import org.junit.After;

http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
index 1c36906..567c015 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
@@ -26,13 +26,10 @@ import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.IterationConfiguration;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexCentricConfiguration;
@@ -48,6 +45,7 @@ import org.junit.Test;
 import org.junit.rules.TemporaryFolder;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
 
 @RunWith(Parameterized.class)
 public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
@@ -88,6 +86,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 		parameters.addBroadcastSetForUpdateFunction("updateBcastSet", env.fromElements(1, 2, 3));
 		parameters.addBroadcastSetForMessagingFunction("messagingBcastSet", env.fromElements(4, 5, 6));
 		parameters.registerAggregator("superstepAggregator", new LongSumAggregator());
+		parameters.setOptNumVertices(true);
 
 		Graph<Long, Long, Long> result = graph.runVertexCentricIteration(
 				new UpdateFunction(), new MessageFunction(), 10, parameters);
@@ -136,6 +135,29 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
+	public void testDefaultConfiguration() throws Exception {
+		/*
+		 * Test Graph's runVertexCentricIteration when configuration parameters are not provided
+		 * i.e. degrees and numVertices will be -1, EdgeDirection will be OUT.
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), 
+				TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
+
+		Graph<Long, Long, Long> result = graph.runVertexCentricIteration(
+				new UpdateFunctionDefault(), new MessageFunctionDefault(), 5);
+
+		result.getVertices().map(new VertexToTuple2Map<Long, Long>()).writeAsCsv(resultPath, "\n", "\t");
+		env.execute();
+		expectedResult = "1	6\n" +
+						"2	6\n" +
+						"3	6\n" +
+						"4	6\n" +
+						"5	6";
+	}
+
+	@Test
 	public void testIterationDefaultDirection() throws Exception {
 
 		/*
@@ -176,7 +198,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 				.mapVertices(new InitialiseHashSetMapper());
 
 		// configure the iteration
-		IterationConfiguration parameters = new IterationConfiguration();
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
 
 		parameters.setDirection(EdgeDirection.IN);
 
@@ -208,7 +230,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 				.mapVertices(new InitialiseHashSetMapper());
 
 		// configure the iteration
-		IterationConfiguration parameters = new IterationConfiguration();
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
 
 		parameters.setDirection(EdgeDirection.ALL);
 
@@ -227,11 +249,36 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
+	public void testNumVerticesNotSet() throws Exception {
+
+		/*
+		 * Test that if the number of vertices option is not set, -1 is returned as value.
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
+				TestGraphUtils.getLongLongEdges(), env);
+
+		DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(),
+				new DummyMessageFunction(), 2).getVertices();
+
+		verticesWithNumVertices.writeAsCsv(resultPath, "\n", "\t");
+		env.execute();
+
+		expectedResult = "1	-1\n" +
+				"2	-1\n" +
+				"3	-1\n" +
+				"4	-1\n" +
+				"5	-1";
+	}
+
+	@Test
 	public void testInDegreesSet() throws Exception {
 
 		/*
-		 * Test that if the degrees are set, the in degrees can be accessed in every superstep and the value
-		 * is correctly computed.
+		 * Test that if the degrees are set, they can be accessed in every superstep 
+		 * inside the update function and the value
+		 * is correctly computed for degrees in the messaging function.
 		 */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -239,14 +286,14 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 				TestGraphUtils.getLongLongEdges(), env);
 
 		// configure the iteration
-		IterationConfiguration parameters = new IterationConfiguration();
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
 
 		parameters.setOptDegrees(true);
 
-		DataSet<Vertex<Long, Long>> verticesWithInDegree = graph.runVertexCentricIteration(new UpdateFunctionInDegree(),
-				new DummyMessageFunction(), 5, parameters).getVertices();
+		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
+				new UpdateFunctionInDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices();
 
-		verticesWithInDegree.writeAsCsv(resultPath, "\n", "\t");
+		verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t");
 		env.execute();
 
 		expectedResult = "1	1\n" +
@@ -257,41 +304,36 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
-	public void testOutDegreesSet() throws Exception {
+	public void testInDegreesNotSet() throws Exception {
 
 		/*
-		 * Test that if the degrees are set, the out degrees can be accessed in every superstep and the value
-		 * is correctly computed.
+		 * Test that if the degrees option is not set, then -1 is returned as a value for in-degree.
 		 */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
 		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
 				TestGraphUtils.getLongLongEdges(), env);
 
-		// configure the iteration
-		IterationConfiguration parameters = new IterationConfiguration();
-
-		parameters.setOptDegrees(true);
+		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
+				new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices();
 
-		DataSet<Vertex<Long, Long>> verticesWithOutDegree = graph.runVertexCentricIteration(new UpdateFunctionOutDegree(),
-				new DummyMessageFunction(), 5, parameters).getVertices();
-
-		verticesWithOutDegree.writeAsCsv(resultPath, "\n", "\t");
+		verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t");
 		env.execute();
 
-		expectedResult = "1	2\n" +
-				"2	1\n" +
-				"3	2\n" +
-				"4	1\n" +
-				"5	1";
+		expectedResult = "1	-1\n" +
+				"2	-1\n" +
+				"3	-1\n" +
+				"4	-1\n" +
+				"5	-1";
 	}
 
 	@Test
-	public void testNumVerticesSet() throws Exception {
+	public void testOutDegreesSet() throws Exception {
 
 		/*
-		 * Test that if the number of vertices option is set, it can be accessed in every superstep and the value
-		 * is correctly computed.
+		 * Test that if the degrees are set, they can be accessed in every superstep
+		 * inside the update function and the value
+		 * is correctly computed for degrees in the messaging function.
 		 */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -299,56 +341,45 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 				TestGraphUtils.getLongLongEdges(), env);
 
 		// configure the iteration
-		IterationConfiguration parameters = new IterationConfiguration();
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
 
-		parameters.setOptNumVertices(true);
+		parameters.setOptDegrees(true);
 
-		DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(),
-				new DummyMessageFunction(), 5, parameters).getVertices();
+		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
+				new UpdateFunctionOutDegrees(), new DegreesMessageFunction(), 5, parameters).getVertices();
 
-		verticesWithNumVertices.writeAsCsv(resultPath, "\n", "\t");
+		verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t");
 		env.execute();
 
-		expectedResult = "1	5\n" +
-				"2	5\n" +
-				"3	5\n" +
-				"4	5\n" +
-				"5	5";
+		expectedResult = "1	2\n" +
+				"2	1\n" +
+				"3	2\n" +
+				"4	1\n" +
+				"5	1";
 	}
 
 	@Test
-	public void testDegrees() throws Exception {
+	public void testOutDegreesNotSet() throws Exception {
 
 		/*
-		 * Test that if the degrees are set, they can be accessed in every superstep and the value
-		 * is correctly computed for both in and out degrees.
+		 * Test that if the degrees option is not set, then -1 is returned as a value for out-degree.
 		 */
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
-		Graph<Long, Tuple3<Long, Long, Boolean>, Long> graph = Graph.fromCollection(TestGraphUtils.getLongVerticesWithDegrees(),
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
 				TestGraphUtils.getLongLongEdges(), env);
 
-		// configure the iteration
-		IterationConfiguration parameters = new IterationConfiguration();
-
-		parameters.setOptDegrees(true);
-
-		DataSet<Vertex<Long, Tuple3<Long, Long, Boolean>>> verticesWithDegrees = graph.runVertexCentricIteration(
-				new UpdateFunctionDegrees(), new DegreeMessageFunction(), 5, parameters).getVertices();
+		DataSet<Vertex<Long, Long>> verticesWithDegrees = graph.runVertexCentricIteration(
+				new UpdateFunctionInDegrees(), new DummyMessageFunction(), 2).getVertices();
 
-		verticesWithDegrees.map(new MapFunction<Vertex<Long,Tuple3<Long,Long,Boolean>>, Tuple2<Long, Boolean>>() {
-			@Override
-			public Tuple2<Long, Boolean> map(Vertex<Long, Tuple3<Long, Long, Boolean>> vertex) throws Exception {
-				return new Tuple2<Long, Boolean>(vertex.getId(), vertex.getValue().f2);
-			}
-		}).writeAsCsv(resultPath, "\n", "\t");
+		verticesWithDegrees.writeAsCsv(resultPath, "\n", "\t");
 		env.execute();
 
-		expectedResult = "1	true\n" +
-				"2	true\n" +
-				"3	true\n" +
-				"4	true\n" +
-				"5	true";
+		expectedResult = "1	-1\n" +
+				"2	-1\n" +
+				"3	-1\n" +
+				"4	-1\n" +
+				"5	-1";
 	}
 
 	@Test
@@ -364,7 +395,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 				TestGraphUtils.getLongLongEdges(), env);
 
 		// configure the iteration
-		IterationConfiguration parameters = new IterationConfiguration();
+		VertexCentricConfiguration parameters = new VertexCentricConfiguration();
 
 		parameters.setOptDegrees(true);
 		parameters.setDirection(EdgeDirection.ALL);
@@ -399,12 +430,36 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 			
 			// test aggregator
 			aggregator = getIterationAggregator("superstepAggregator");
+
+			// test number of vertices
+			Assert.assertEquals(5, getNumberOfVertices());
+			
 		}
 
 		@Override
 		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
 			long superstep = getSuperstepNumber();
 			aggregator.aggregate(superstep);
+
+			setNewVertexValue(vertex.getValue() + 1);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class UpdateFunctionDefault extends VertexUpdateFunction<Long, Long, Long> {
+
+		LongSumAggregator aggregator = new LongSumAggregator();
+
+		@Override
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+
+			// test number of vertices
+			Assert.assertEquals(-1, getNumberOfVertices());
+
+			// test degrees
+			Assert.assertEquals(-1, getInDegree());
+			Assert.assertEquals(-1, getOutDegree());
+
 			setNewVertexValue(vertex.getValue() + 1);
 		}
 	}
@@ -421,6 +476,9 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 			Assert.assertEquals(4, bcastSet.get(0));
 			Assert.assertEquals(5, bcastSet.get(1));
 			Assert.assertEquals(6, bcastSet.get(2));
+
+			// test number of vertices
+			Assert.assertEquals(5, getNumberOfVertices());
 			
 			// test aggregator
 			if (getSuperstepNumber() == 2) {
@@ -437,28 +495,18 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	public static final class UpdateFunctionInDegree extends VertexUpdateFunction<Long, Long, Long> {
+	public static final class MessageFunctionDefault extends MessagingFunction<Long, Long, Long, Long> {
 
 		@Override
-		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
-			try {
-				setNewVertexValue(((VertexWithDegrees) vertex).getInDegree());
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	public static final class UpdateFunctionOutDegree extends VertexUpdateFunction<Long, Long, Long> {
+		public void sendMessages(Vertex<Long, Long> vertex) {
+			// test number of vertices
+			Assert.assertEquals(-1, getNumberOfVertices());
 
-		@Override
-		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
-			try {
-				setNewVertexValue(((VertexWithDegrees) vertex).getOutDegree());
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
+			// test degrees
+			Assert.assertEquals(-1, getInDegree());
+			Assert.assertEquals(-1, getOutDegree());
+			//send message to keep vertices active
+			sendMessageToAllNeighbors(vertex.getValue());
 		}
 	}
 
@@ -467,11 +515,7 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 
 		@Override
 		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
-			try {
 				setNewVertexValue(getNumberOfVertices());
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
 		}
 	}
 
@@ -495,8 +539,25 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	public static final class VertexUpdateDirection extends VertexUpdateFunction<Long, HashSet<Long>,
-			Long> {
+	public static final class DegreesMessageFunction extends MessagingFunction<Long, Long, Long, Long> {
+
+		@Override
+		public void sendMessages(Vertex<Long, Long> vertex) {
+			if (vertex.getId().equals(1)) {
+				Assert.assertEquals(2, getOutDegree());
+				Assert.assertEquals(1, getInDegree());
+			}
+			else if(vertex.getId().equals(3)) {
+				Assert.assertEquals(2, getOutDegree());
+				Assert.assertEquals(2, getInDegree());
+			}
+			//send message to keep vertices active
+			sendMessageToAllNeighbors(vertex.getValue());
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class VertexUpdateDirection extends VertexUpdateFunction<Long, HashSet<Long>, Long> {
 
 		@Override
 		public void updateVertex(Vertex<Long, HashSet<Long>> vertex, MessageIterator<Long> messages) throws Exception {
@@ -511,6 +572,26 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 	}
 
 	@SuppressWarnings("serial")
+	public static final class UpdateFunctionInDegrees extends VertexUpdateFunction<Long, Long, Long> {
+
+		@Override
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+			long inDegree = getInDegree();
+			setNewVertexValue(inDegree);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class UpdateFunctionOutDegrees extends VertexUpdateFunction<Long, Long, Long> {
+
+		@Override
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+			long outDegree = getOutDegree();
+			setNewVertexValue(outDegree);
+		}
+	}
+
+	@SuppressWarnings("serial")
 	public static final class VertexUpdateNumNeighbors extends VertexUpdateFunction<Long, Boolean,
 			Long> {
 
@@ -519,25 +600,21 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 
 			long count = 0;
 
-			for(long msg : messages) {
+			for(@SuppressWarnings("unused") long msg : messages) {
 				count++;
 			}
-
-			setNewVertexValue(count == (((VertexWithDegrees)vertex).getInDegree() + ((VertexWithDegrees)vertex).getOutDegree()));
+			setNewVertexValue(count == (getInDegree() + getOutDegree()));
 		}
 	}
 
 	@SuppressWarnings("serial")
-	public static final class UpdateFunctionDegrees extends VertexUpdateFunction<Long, Tuple3<Long, Long, Boolean>, Long> {
+	public static final class UpdateFunctionDegrees extends VertexUpdateFunction<Long, Long, Long> {
 
 		@Override
-		public void updateVertex(Vertex<Long, Tuple3<Long, Long, Boolean>> vertex, MessageIterator<Long> inMessages) {
-			try {
-				setNewVertexValue(new Tuple3(vertex.getValue().f0, vertex.getValue().f1, (((VertexWithDegrees)vertex).getInDegree() == vertex.getValue().f0)
-						&& (((VertexWithDegrees)vertex).getOutDegree() == vertex.getValue().f1) && vertex.getValue().f2));
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
+		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) {
+			long inDegree = getInDegree();
+			long outDegree = getOutDegree();
+			setNewVertexValue(inDegree + outDegree);
 		}
 	}
 
@@ -594,16 +671,6 @@ public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	public static final class DegreeMessageFunction extends MessagingFunction<Long, Tuple3<Long, Long, Boolean>, Long, Long> {
-
-		@Override
-		public void sendMessages(Vertex<Long, Tuple3<Long, Long, Boolean>> vertex) {
-			//send message to keep vertices active
-			sendMessageToAllNeighbors(vertex.getValue().f0);
-		}
-	}
-
-	@SuppressWarnings("serial")
 	public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
 
 		public Long map(Vertex<Long, Long> value) {

http://git-wip-us.apache.org/repos/asf/flink/blob/e2f28d47/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationWithExceptionITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationWithExceptionITCase.java
deleted file mode 100644
index 5c57f47..0000000
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationWithExceptionITCase.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.test;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.io.DiscardingOutputFormat;
-import org.apache.flink.configuration.ConfigConstants;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.VertexWithDegrees;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-import org.apache.flink.test.util.ForkableFlinkMiniCluster;
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
-import org.junit.Test;
-
-import static org.junit.Assert.fail;
-
-public class VertexCentricConfigurationWithExceptionITCase {
-
-	private static final int PARALLELISM = 4;
-
-	private static ForkableFlinkMiniCluster cluster;
-
-
-	@BeforeClass
-	public static void setupCluster() {
-		try {
-			Configuration config = new Configuration();
-			config.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, PARALLELISM);
-			cluster = new ForkableFlinkMiniCluster(config, false);
-		}
-		catch (Exception e) {
-			e.printStackTrace();
-			fail("Error starting test cluster: " + e.getMessage());
-		}
-	}
-
-	@AfterClass
-	public static void tearDownCluster() {
-		try {
-			cluster.stop();
-		}
-		catch (Throwable t) {
-			t.printStackTrace();
-			fail("Cluster shutdown caused an exception: " + t.getMessage());
-		}
-	}
-
-	@Test
-	public void testOutDegreesNotSet() throws Exception {
-
-		/*
-		 * Test that if the degrees are not set, the out degrees cannot be accessed - an
-		 * exception is thrown.
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getJobManagerRPCPort());
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-
-		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
-				TestGraphUtils.getLongLongEdges(), env);
-
-		try {
-			DataSet<Vertex<Long, Long>> verticesWithOutDegrees = graph.runVertexCentricIteration(new UpdateFunctionOutDegree(),
-					new DummyMessageFunction(), 5).getVertices();
-
-			verticesWithOutDegrees.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
-			env.execute();
-
-			fail("The degree option not set test did not fail");
-		} catch (Exception e) {
-			// We expect the job to fail with an exception
-		}
-	}
-
-	@Test
-	public void testInDegreesNotSet() throws Exception {
-
-		/*
-		 * Test that if the degrees are not set, the in degrees cannot be accessed - an
-		 * exception is thrown.
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getJobManagerRPCPort());
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-
-		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
-				TestGraphUtils.getLongLongEdges(), env);
-
-		try {
-			DataSet<Vertex<Long, Long>> verticesWithInDegrees = graph.runVertexCentricIteration(new UpdateFunctionInDegree(),
-					new DummyMessageFunction(), 5).getVertices();
-
-			verticesWithInDegrees.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
-			env.execute();
-
-			fail("The degree option not set test did not fail");
-		} catch (Exception e) {
-			// We expect the job to fail with an exception
-		}
-	}
-
-	@Test
-	public void testNumVerticesNotSet() throws Exception {
-
-		/*
-		 * Test that if the number of vertices option is not set, this number cannot be accessed -
-		 * an exception id thrown.
-		 */
-		final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(
-				"localhost", cluster.getJobManagerRPCPort());
-		env.setParallelism(PARALLELISM);
-		env.getConfig().disableSysoutLogging();
-
-		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(),
-				TestGraphUtils.getLongLongEdges(), env);
-
-		try {
-			DataSet<Vertex<Long, Long>> verticesWithNumVertices = graph.runVertexCentricIteration(new UpdateFunctionNumVertices(),
-					new DummyMessageFunction(), 5).getVertices();
-
-			verticesWithNumVertices.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
-			env.execute();
-
-			fail("The num vertices option not set test did not fail");
-		} catch (Exception e) {
-			// We expect the job to fail with an exception
-		}
-	}
-
-	@SuppressWarnings("serial")
-	public static final class UpdateFunctionInDegree extends VertexUpdateFunction<Long, Long, Long> {
-
-		@Override
-		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) throws Exception {
-			setNewVertexValue(((VertexWithDegrees) vertex).getInDegree());
-		}
-	}
-
-	@SuppressWarnings("serial")
-	public static final class UpdateFunctionOutDegree extends VertexUpdateFunction<Long, Long, Long> {
-
-		@Override
-		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) throws Exception {
-			setNewVertexValue(((VertexWithDegrees)vertex).getOutDegree());
-		}
-	}
-
-	@SuppressWarnings("serial")
-	public static final class UpdateFunctionNumVertices extends VertexUpdateFunction<Long, Long, Long> {
-
-		@Override
-		public void updateVertex(Vertex<Long, Long> vertex, MessageIterator<Long> inMessages) throws Exception {
-			setNewVertexValue(getNumberOfVertices());
-		}
-	}
-
-	@SuppressWarnings("serial")
-	public static final class DummyMessageFunction extends MessagingFunction<Long, Long, Long, Long> {
-
-		@Override
-		public void sendMessages(Vertex<Long, Long> vertex) {
-			//send message to keep vertices active
-			sendMessageToAllNeighbors(vertex.getValue());
-		}
-	}
-}