You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/04/10 10:29:25 UTC

[1/4] flink git commit: [FLINK-1694] [gelly] added IterationConfiguration as a way to configure a VertexCentricIteration

Repository: flink
Updated Branches:
  refs/heads/master e45f13f53 -> c518df944


[FLINK-1694] [gelly] added IterationConfiguration as a way to configure a VertexCentricIteration


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e98bd853
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e98bd853
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e98bd853

Branch: refs/heads/master
Commit: e98bd85384946f9c6834542a39e0eb63c2c95f15
Parents: e45f13f
Author: vasia <va...@gmail.com>
Authored: Sun Mar 29 23:39:08 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Fri Apr 10 10:53:02 2015 +0300

----------------------------------------------------------------------
 docs/gelly_guide.md                             |  79 ++++++-
 .../main/java/org/apache/flink/graph/Graph.java |  45 +++-
 .../flink/graph/library/LabelPropagation.java   |   6 +-
 .../apache/flink/graph/library/PageRank.java    |   6 +-
 .../graph/library/SimpleCommunityDetection.java |   8 +-
 .../library/SingleSourceShortestPaths.java      |  10 +-
 .../graph/spargel/IterationConfiguration.java   | 192 +++++++++++++++++
 .../graph/spargel/VertexCentricIteration.java   | 168 ++++-----------
 .../graph/spargel/VertexUpdateFunction.java     |   2 +-
 .../test/CollectionModeSuperstepITCase.java     |   7 +-
 .../test/VertexCentricConfigurationITCase.java  | 213 +++++++++++++++++++
 .../VertexCentricConnectedComponentsITCase.java |   4 +-
 12 files changed, 565 insertions(+), 175 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/docs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md
index 9cceafc..203cffa 100644
--- a/docs/gelly_guide.md
+++ b/docs/gelly_guide.md
@@ -347,29 +347,86 @@ Vertex-centric Iterations
 Gelly wraps Flink's [Spargel API](spargel_guide.html) to provide methods for vertex-centric iterations.
 Like in Spargel, the user only needs to implement two functions: a `VertexUpdateFunction`, which defines how a vertex will update its value
 based on the received messages and a `MessagingFunction`, which allows a vertex to send out messages for the next superstep.
-These functions are given as parameters to Gelly's `createVertexCentricIteration`, which returns a `VertexCentricIteration`. 
-The user can configure this iteration (set the name, the parallelism, aggregators, etc.) and then run the computation, using the `runVertexCentricIteration` method:
+These functions and the maximum number of iterations to run are given as parameters to Gelly's `runVertexCentricIteration`.
+This method will execute the vertex-centric iteration on the input Graph and return a new Graph, with updated vertex values:
 
 {% highlight java %}
 Graph<Long, Double, Double> graph = ...
 
-// create the vertex-centric iteration
-VertexCentricIteration<Long, Double, Double, Double> iteration = 
-			graph.createVertexCentricIteration(
+// run Single-Source-Shortest-Paths vertex-centric iteration
+Graph<Long, Double, Double> result = 
+			graph.runVertexCentricIteration(
 			new VertexDistanceUpdater(), new MinDistanceMessenger(), maxIterations);
 
+// user-defined functions
+public static final class VertexDistanceUpdater {...}
+public static final class MinDistanceMessenger {...}
+
+{% endhighlight %}
+
+### Configuring a Vertex-Centric Iteration
+A vertex-centric iteration can be configured using an `IterationConfiguration` object.
+Currently, the following parameters can be specified:
+
+* <strong>Name</strong>: The name for the vertex-centric iteration. The name is displayed in logs and messages 
+and can be specified using the `setName()` method.
+
+* <strong>Parallelism</strong>: The parallelism for the iteration. It can be set using the `setParallelism()` method.	
+
+* <strong>Solution set in unmanaged memory</strong>: Defines whether the solution set is kept in managed memory (Flink's internal way of keeping object in serialized form) or as a simple object map. By default, the solution set runs in managed memory. This property can be set using the `setSolutionSetUnmanagedMemory()` method.
+
+* <strong>Aggregators</strong>: Iteration aggregators can be registered using the `registerAggregator()` method. An iteration aggregator combines
+all aggregates globally once per superstep and makes them available in the next superstep. Registered aggregators can be accessed inside the user-defined `VertexUpdateFunction` and `MessagingFunction`.
+
+* <strong>Broadcast Variables</strong>: DataSets can be added as [Broadcast Variables](programming_guide.html#broadcast-variables) to the `VertexUpdateFunction` and `MessagingFunction`, using the `addBroadcastSetForUpdateFunction()` and `addBroadcastSetForMessagingFunction()` methods, respectively.
+
+{% highlight java %}
+
+Graph<Long, Double, Double> graph = ...
+
+// configure the iteration
+IterationConfiguration parameters = new IterationConfiguration();
+
 // set the iteration name
-iteration.setName("Single Source Shortest Paths");
+parameters.setName("Gelly Iteration");
 
 // set the parallelism
-iteration.setParallelism(16);
+parameters.setParallelism(16);
+
+// register an aggregator
+parameters.registerAggregator("sumAggregator", new LongSumAggregator());
 
-// run the computation
-graph.runVertexCentricIteration(iteration);
+// run the vertex-centric iteration, also passing the configuration parameters
+Graph<Long, Double, Double> result = 
+			graph.runVertexCentricIteration(
+			new VertexUpdater(), new Messenger(), maxIterations, parameters);
 
 // user-defined functions
-public static final class VertexDistanceUpdater {...}
-public static final class MinDistanceMessenger {...}
+public static final class VertexUpdater extends VertexUpdateFunction {
+
+	LongSumAggregator aggregator = new LongSumAggregator();
+
+	public void preSuperstep() {
+	
+		// retrieve the Aggregator
+		aggregator = getIterationAggregator("sumAggregator");
+	}
+
+
+	public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator inMessages) {
+		
+		//do some computation
+		Long partialValue = ...
+
+		// aggregate the partial value
+		aggregator.aggregate(partialValue);
+
+		// update the vertex value
+		setNewVertexValue(...);
+	}
+}
+
+public static final class Messenger extends MessagingFunction {...}
 
 {% endhighlight %}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index a73beaf..8280ba9 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -47,6 +47,7 @@ import org.apache.flink.api.java.tuple.Tuple4;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.graph.spargel.IterationConfiguration;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
@@ -1149,30 +1150,52 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	}
 
 	/**
-	 * Create a Vertex-Centric iteration on the graph.
-	 * 
+	 * Runs a Vertex-Centric iteration on the graph.
+	 * No configuration options are provided.
+
 	 * @param vertexUpdateFunction the vertex update function
 	 * @param messagingFunction the messaging function
 	 * @param maximumNumberOfIterations maximum number of iterations to perform
-	 * @return
+	 * 
+	 * @return the updated Graph after the vertex-centric iteration has converged or
+	 * after maximumNumberOfIterations.
 	 */
-	public <M> VertexCentricIteration<K, VV, M, EV> createVertexCentricIteration(
+	public <M> Graph<K, VV, EV> runVertexCentricIteration(
 			VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
 			MessagingFunction<K, VV, M, EV> messagingFunction,
 			int maximumNumberOfIterations) {
-		return VertexCentricIteration.withEdges(edges, vertexUpdateFunction,
-				messagingFunction, maximumNumberOfIterations);
+
+		VertexCentricIteration<K, VV, M, EV> iteration = VertexCentricIteration.withEdges(
+				edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations);
+
+		DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
+
+		return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
 	}
-    
+
 	/**
-	 * Runs a Vertex-Centric iteration on the graph.
+	 * Runs a Vertex-Centric iteration on the graph with configuration options.
+	 * 
+	 * @param vertexUpdateFunction the vertex update function
+	 * @param messagingFunction the messaging function
+	 * @param maximumNumberOfIterations maximum number of iterations to perform
+	 * @param parameters the iteration configuration parameters
 	 * 
-	 * @param iteration the Vertex-Centric iteration to run
-	 * @return
+	 * @return the updated Graph after the vertex-centric iteration has converged or
+	 * after maximumNumberOfIterations.
 	 */
 	public <M> Graph<K, VV, EV> runVertexCentricIteration(
-			VertexCentricIteration<K, VV, M, EV> iteration) {
+			VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
+			MessagingFunction<K, VV, M, EV> messagingFunction,
+			int maximumNumberOfIterations, IterationConfiguration parameters) {
+
+		VertexCentricIteration<K, VV, M, EV> iteration = VertexCentricIteration.withEdges(
+				edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations);
+
+		iteration.configure(parameters);
+
 		DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
+
 		return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
index 33a04e7..ff6fe85 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -22,7 +22,6 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.types.NullValue;
 
@@ -56,9 +55,8 @@ public class LabelPropagation<K extends Comparable<K> & Serializable>
 
 		// iteratively adopt the most frequent label among the neighbors
 		// of each vertex
-		VertexCentricIteration<K, Long, Long, NullValue> iteration = input.createVertexCentricIteration(
-				new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(), maxIterations);
-		return input.runVertexCentricIteration(iteration);
+		return input.runVertexCentricIteration(new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(),
+				maxIterations);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index 00ae204..48c9a51 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -25,7 +25,6 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
 public class PageRank<K extends Comparable<K> & Serializable> implements
@@ -43,11 +42,8 @@ public class PageRank<K extends Comparable<K> & Serializable> implements
 	public Graph<K, Double, Double> run(Graph<K, Double, Double> network) throws Exception {
 
 		final long numberOfVertices = network.numberOfVertices();
-
-		VertexCentricIteration<K, Double, Double, Double> iteration = network.createVertexCentricIteration(
-				new VertexRankUpdater<K>(beta, numberOfVertices), new RankMessenger<K>(numberOfVertices),
+		return network.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices), new RankMessenger<K>(numberOfVertices),
 				maxIterations);
-		return network.runVertexCentricIteration(iteration);
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
index fb32781..e3d3e1c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SimpleCommunityDetection.java
@@ -26,7 +26,6 @@ import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
 import java.util.Map;
@@ -65,11 +64,8 @@ public class SimpleCommunityDetection implements GraphAlgorithm<Long, Long, Doub
 		Graph<Long, Tuple2<Long, Double>, Double> graphWithScoredVertices = undirectedGraph
 				.mapVertices(new AddScoreToVertexValuesMapper());
 
-		VertexCentricIteration<Long, Tuple2<Long, Double>, Tuple2<Long, Double>, Double>
-				iteration = graphWithScoredVertices.createVertexCentricIteration(new VertexLabelUpdater(delta),
-				new LabelMessenger(), maxIterations);
-
-		return graphWithScoredVertices.runVertexCentricIteration(iteration)
+		return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater(delta),
+				new LabelMessenger(), maxIterations)
 				.mapVertices(new RemoveScoreFromVertexValuesMapper());
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
index 3e9a29d..262b2c5 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -25,7 +25,6 @@ import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
 import java.io.Serializable;
@@ -45,12 +44,9 @@ public class SingleSourceShortestPaths<K extends Comparable<K> & Serializable>
 	@Override
 	public Graph<K, Double, Double> run(Graph<K, Double, Double> input) {
 
-		Graph<K, Double, Double> mappedInput = input.mapVertices(new InitVerticesMapper<K>(srcVertexId));
-
-		VertexCentricIteration<K, Double, Double, Double> iteration = mappedInput.createVertexCentricIteration(
-				new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(), maxIterations);
-
-		return mappedInput.runVertexCentricIteration(iteration);
+		return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
+				.runVertexCentricIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
+				maxIterations);
 	}
 
 	public static final class InitVerticesMapper<K extends Comparable<K> & Serializable>

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/IterationConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/IterationConfiguration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/IterationConfiguration.java
new file mode 100644
index 0000000..f161d8d
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/IterationConfiguration.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.Validate;
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ * This class is used to configure a vertex-centric iteration.
+ *
+ * An IterationConfiguration object can be used to set the iteration name and
+ * degree of parallelism, to register aggregators and use broadcast sets in
+ * the {@link VertexUpdateFunction} and {@link MessagingFunction}.
+ *
+ * The IterationConfiguration object is passed as an argument to
+ * {@link org.apache.flink.graph.Graph#runVertexCentricIteration(
+ * VertexUpdateFunction, MessagingFunction, int, IterationConfiguration)}.
+ *
+ */
+public class IterationConfiguration {
+
+	/** the iteration name **/
+	private String name;
+
+	/** the iteration parallelism **/
+	private int parallelism = -1;
+
+	/** the iteration aggregators **/
+	private Map<String, Aggregator<?>> aggregators = new HashMap<String, Aggregator<?>>();
+
+	/** the broadcast variables for the update function **/
+	private List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>();
+
+	/** the broadcast variables for the messaging function **/
+	private List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>();
+
+	/** flag that defines whether the solution set is kept in managed memory **/
+	private boolean unmanagedSolutionSet = false;
+	
+	public IterationConfiguration() {}
+
+
+	/**
+	 * Sets the name for the vertex-centric iteration. The name is displayed in logs and messages.
+	 * 
+	 * @param name The name for the iteration.
+	 */
+	public void setName(String name) {
+		this.name = name;
+	}
+
+	/**
+	 * Gets the name of the vertex-centric iteration.
+	 * @param defaultName 
+	 * 
+	 * @return The name of the iteration.
+	 */
+	public String getName(String defaultName) {
+		if (name != null) {
+			return name;			
+		}
+		else {
+			return defaultName;
+		}
+	}
+
+	/**
+	 * Sets the parallelism for the iteration.
+	 * 
+	 * @param parallelism The parallelism.
+	 */
+	public void setParallelism(int parallelism) {
+		Validate.isTrue(parallelism > 0 || parallelism == -1, "The parallelism must be positive, or -1 (use default).");
+		this.parallelism = parallelism;
+	}
+	
+	/**
+	 * Gets the iteration's parallelism.
+	 * 
+	 * @return The iterations parallelism, or -1, if not set.
+	 */
+	public int getParallelism() {
+		return parallelism;
+	}
+
+	/**
+	 * Defines whether the solution set is kept in managed memory (Flink's internal way of keeping object
+	 * in serialized form) or as a simple object map.
+	 * By default, the solution set runs in managed memory.
+	 * 
+	 * @param unmanaged True, to keep the solution set in unmanaged memory, false otherwise.
+	 */
+	public void setSolutionSetUnmanagedMemory(boolean unmanaged) {
+		this.unmanagedSolutionSet = unmanaged;
+	}
+	
+	/**
+	 * Gets whether the solution set is kept in managed memory (Flink's internal way of keeping object
+	 * in serialized form) or as a simple object map.
+	 * By default, the solution set runs in managed memory.
+	 * 
+	 * @return True, if the solution set is in unmanaged memory, false otherwise.
+	 */
+	public boolean isSolutionSetUnmanagedMemory() {
+		return this.unmanagedSolutionSet;
+	}
+
+	/**
+	 * Registers a new aggregator. Aggregators registered here are available during the execution of the vertex updates
+	 * via {@link VertexUpdateFunction#getIterationAggregator(String)} and
+	 * {@link VertexUpdateFunction#getPreviousIterationAggregate(String)}.
+	 * 
+	 * @param name The name of the aggregator, used to retrieve it and its aggregates during execution. 
+	 * @param aggregator The aggregator.
+	 */
+	public void registerAggregator(String name, Aggregator<?> aggregator) {
+		this.aggregators.put(name, aggregator);
+	}
+	
+	/**
+	 * Adds a data set as a broadcast set to the messaging function.
+	 * 
+	 * @param name The name under which the broadcast data is available in the messaging function.
+	 * @param data The data set to be broadcasted.
+	 */
+	public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
+		this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
+	}
+
+	/**
+	 * Adds a data set as a broadcast set to the vertex update function.
+	 * 
+	 * @param name The name under which the broadcast data is available in the vertex update function.
+	 * @param data The data set to be broadcasted.
+	 */
+	public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
+		this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
+	}
+
+	/**
+	 * Gets the set of aggregators that are registered for this vertex-centric iteration.
+	 *
+	 * @return a Map of the registered aggregators, where the key is the aggregator name
+	 * and the value is the Aggregator object
+	 */
+	public Map<String, Aggregator<?>> getAggregators() {
+		return this.aggregators;
+	}
+
+	/**
+	 * Get the broadcast variables of the VertexUpdateFunction.
+	 *
+	 * @return a List of Tuple2, where the first field is the broadcast variable name
+	 * and the second field is the broadcast DataSet.
+	 */
+	public List<Tuple2<String, DataSet<?>>> getUpdateBcastVars() {
+		return this.bcVarsUpdate;
+	}
+
+	/**
+	 * Get the broadcast variables of the MessagingFunction.
+	 *
+	 * @return a List of Tuple2, where the first field is the broadcast variable name
+	 * and the second field is the broadcast DataSet.
+	 */
+	public List<Tuple2<String, DataSet<?>>> getMessagingBcastVars() {
+		return this.bcVarsMessaging;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
index c54ee0c..ca66521 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -19,10 +19,7 @@
 package org.apache.flink.graph.spargel;
 
 import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.HashMap;
 import java.util.Iterator;
-import java.util.List;
 import java.util.Map;
 
 import org.apache.commons.lang3.Validate;
@@ -82,23 +79,13 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
 	
 	private final DataSet<Edge<VertexKey, EdgeValue>> edgesWithValue;
 	
-	private final Map<String, Aggregator<?>> aggregators;
-	
 	private final int maximumNumberOfIterations;
 	
-	private final List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>(4);
-	
-	private final List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>(4);
-	
 	private final TypeInformation<Message> messageType;
 	
 	private DataSet<Vertex<VertexKey, VertexValue>> initialVertices;
-	
-	private String name;
-	
-	private int parallelism = -1;
-	
-	private boolean unmanagedSolutionSet;
+
+	private IterationConfiguration configuration;
 	
 	// ----------------------------------------------------------------------------------
 	
@@ -115,8 +102,7 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
 		this.updateFunction = uf;
 		this.messagingFunction = mf;
 		this.edgesWithValue = edgesWithValue;
-		this.maximumNumberOfIterations = maximumNumberOfIterations;
-		this.aggregators = new HashMap<String, Aggregator<?>>();		
+		this.maximumNumberOfIterations = maximumNumberOfIterations;		
 		this.messageType = getMessageType(mf);
 	}
 	
@@ -124,97 +110,6 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
 		return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), 2, null, null);
 	}
 	
-	/**
-	 * Registers a new aggregator. Aggregators registered here are available during the execution of the vertex updates
-	 * via {@link VertexUpdateFunction#getIterationAggregator(String)} and
-	 * {@link VertexUpdateFunction#getPreviousIterationAggregate(String)}.
-	 * 
-	 * @param name The name of the aggregator, used to retrieve it and its aggregates during execution. 
-	 * @param aggregator The aggregator.
-	 */
-	public void registerAggregator(String name, Aggregator<?> aggregator) {
-		this.aggregators.put(name, aggregator);
-	}
-	
-	/**
-	 * Adds a data set as a broadcast set to the messaging function.
-	 * 
-	 * @param name The name under which the broadcast data is available in the messaging function.
-	 * @param data The data set to be broadcasted.
-	 */
-	public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
-		this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
-	}
-
-	/**
-	 * Adds a data set as a broadcast set to the vertex update function.
-	 * 
-	 * @param name The name under which the broadcast data is available in the vertex update function.
-	 * @param data The data set to be broadcasted.
-	 */
-	public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
-		this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
-	}
-	
-	/**
-	 * Sets the name for the vertex-centric iteration. The name is displayed in logs and messages.
-	 * 
-	 * @param name The name for the iteration.
-	 */
-	public void setName(String name) {
-		this.name = name;
-	}
-	
-	/**
-	 * Gets the name from this vertex-centric iteration.
-	 * 
-	 * @return The name of the iteration.
-	 */
-	public String getName() {
-		return name;
-	}
-	
-	/**
-	 * Sets the parallelism for the iteration.
-	 * 
-	 * @param parallelism The parallelism.
-	 */
-	public void setParallelism(int parallelism) {
-		Validate.isTrue(parallelism > 0 || parallelism == -1, "The parallelism must be positive, or -1 (use default).");
-		this.parallelism = parallelism;
-	}
-	
-	/**
-	 * Gets the iteration's parallelism.
-	 * 
-	 * @return The iterations parallelism, or -1, if not set.
-	 */
-	public int getParallelism() {
-		return parallelism;
-	}
-	
-	/**
-	 * Defines whether the solution set is kept in managed memory (Flink's internal way of keeping object
-	 * in serialized form) or as a simple object map.
-	 * By default, the solution set runs in managed memory.
-	 * 
-	 * @param unmanaged True, to keep the solution set in unmanaged memory, false otherwise.
-	 */
-	public void setSolutionSetUnmanagedMemory(boolean unmanaged) {
-		this.unmanagedSolutionSet = unmanaged;
-	}
-	
-	/**
-	 * Gets whether the solution set is kept in managed memory (Flink's internal way of keeping object
-	 * in serialized form) or as a simple object map.
-	 * By default, the solution set runs in managed memory.
-	 * 
-	 * @return True, if the solution set is in unmanaged memory, false otherwise.
-	 */
-	public boolean isSolutionSetUnmanagedMemory() {
-		return this.unmanagedSolutionSet;
-	}
-	
 	// --------------------------------------------------------------------------------------------
 	//  Custom Operator behavior
 	// --------------------------------------------------------------------------------------------
@@ -249,20 +144,27 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
 		TypeInformation<VertexKey> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
 		TypeInformation<Tuple2<VertexKey, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<VertexKey,Message>>(keyType, messageType);
 
-		// set up the iteration operator
-		final String name = (this.name != null) ? this.name :
-			"Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")";
 		final int[] zeroKeyPos = new int[] {0};
 	
 		final DeltaIteration<Vertex<VertexKey, VertexValue>, Vertex<VertexKey, VertexValue>> iteration =
 			this.initialVertices.iterateDelta(this.initialVertices, this.maximumNumberOfIterations, zeroKeyPos);
-		iteration.name(name);
-		iteration.parallelism(parallelism);
-		iteration.setSolutionSetUnManaged(unmanagedSolutionSet);
-		
-		// register all aggregators
-		for (Map.Entry<String, Aggregator<?>> entry : this.aggregators.entrySet()) {
-			iteration.registerAggregator(entry.getKey(), entry.getValue());
+
+		// set up the iteration operator
+		if (this.configuration != null) {
+
+			iteration.name(this.configuration.getName(
+					"Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")"));
+			iteration.parallelism(this.configuration.getParallelism());
+			iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
+
+			// register all aggregators
+			for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
+				iteration.registerAggregator(entry.getKey(), entry.getValue());
+			}
+		}
+		else {
+			// no configuration provided; set default name
+			iteration.name("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")");
 		}
 		
 		// build the messaging function (co group)
@@ -272,8 +174,11 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
 		
 		// configure coGroup message function with name and broadcast variables
 		messages = messages.name("Messaging");
-		for (Tuple2<String, DataSet<?>> e : this.bcVarsMessaging) {
-			messages = messages.withBroadcastSet(e.f1, e.f0);
+
+		if (this.configuration != null) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
+				messages = messages.withBroadcastSet(e.f1, e.f0);
+			}			
 		}
 		
 		VertexUpdateUdf<VertexKey, VertexValue, Message> updateUdf = new VertexUpdateUdf<VertexKey, VertexValue, Message>(updateFunction, vertexTypes);
@@ -284,8 +189,11 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
 		
 		// configure coGroup update function with name and broadcast variables
 		updates = updates.name("Vertex State Updates");
-		for (Tuple2<String, DataSet<?>> e : this.bcVarsUpdate) {
-			updates = updates.withBroadcastSet(e.f1, e.f0);
+
+		if (this.configuration != null) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getUpdateBcastVars()) {
+				updates = updates.withBroadcastSet(e.f1, e.f0);
+			}			
 		}
 
 		// let the operator know that we preserve the key field
@@ -452,4 +360,20 @@ public class VertexCentricIteration<VertexKey extends Comparable<VertexKey> & Se
 			return this.resultType;
 		}
 	}
+
+	/**
+	 * Configures this vertex-centric iteration with the provided parameters.
+	 *
+	 * @param parameters the configuration parameters
+	 */
+	public void configure(IterationConfiguration parameters) {
+		this.configuration = parameters;
+	}
+
+	/**
+	 * @return the configuration parameters of this vertex-centric iteration
+	 */
+	public IterationConfiguration getIterationConfiguration() {
+		return this.configuration;
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
index 1157a18..5a7cd5c 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -92,7 +92,7 @@ public abstract class VertexUpdateFunction<VertexKey extends Comparable<VertexKe
 	}
 	
 	/**
-	 * Gets the iteration aggregator registered under the given name. The iteration aggregator is combines
+	 * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
 	 * all aggregates globally once per superstep and makes them available in the next superstep.
 	 * 
 	 * @param name The name of the aggregator.

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
index ffe91d9..d84952a 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/CollectionModeSuperstepITCase.java
@@ -26,7 +26,6 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.graph.utils.VertexToTuple2Map;
 import org.junit.Assert;
@@ -48,9 +47,8 @@ public class CollectionModeSuperstepITCase {
 		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), 
 				TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
 		
-		VertexCentricIteration<Long, Long, Long, Long> iteration = 
-				graph.createVertexCentricIteration(new UpdateFunction(), new MessageFunction(), 10);
-		Graph<Long, Long, Long> result = graph.runVertexCentricIteration(iteration);
+		Graph<Long, Long, Long> result = graph.runVertexCentricIteration(
+				new UpdateFunction(), new MessageFunction(), 10);
 
 		result.getVertices().map(
 				new VertexToTuple2Map<Long, Long>()).output(
@@ -83,5 +81,4 @@ public class CollectionModeSuperstepITCase {
 			return 1l;
 		}
 	}
-
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
new file mode 100644
index 0000000..b497070
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConfigurationITCase.java
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test;
+
+import java.util.List;
+
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.IterationConfiguration;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexCentricIteration;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.VertexToTuple2Map;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.LongValue;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class VertexCentricConfigurationITCase extends MultipleProgramsTestBase {
+
+	public VertexCentricConfigurationITCase(TestExecutionMode mode){
+		super(mode);
+	}
+
+    private String resultPath;
+    private String expectedResult;
+
+    @Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	@Before
+	public void before() throws Exception{
+		resultPath = tempFolder.newFile().toURI().toString();
+	}
+
+	@After
+	public void after() throws Exception{
+		compareResultsByLinesInMemory(expectedResult, resultPath);
+	}
+
+	@Test
+	public void testRunWithConfiguration() throws Exception {
+		/*
+		 * Test Graph's runVertexCentricIteration when configuration parameters are provided
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		
+		Graph<Long, Long, Long> graph = Graph.fromCollection(TestGraphUtils.getLongLongVertices(), 
+				TestGraphUtils.getLongLongEdges(), env).mapVertices(new AssignOneMapper());
+
+		// create the configuration object
+		IterationConfiguration parameters = new IterationConfiguration();
+
+		parameters.addBroadcastSetForUpdateFunction("updateBcastSet", env.fromElements(1, 2, 3));
+		parameters.addBroadcastSetForMessagingFunction("messagingBcastSet", env.fromElements(4, 5, 6));
+		parameters.registerAggregator("superstepAggregator", new LongSumAggregator());
+
+		Graph<Long, Long, Long> result = graph.runVertexCentricIteration(
+				new UpdateFunction(), new MessageFunction(), 10, parameters);
+
+		result.getVertices().map(new VertexToTuple2Map<Long, Long>()).writeAsCsv(resultPath, "\n", "\t");
+		env.execute();
+		expectedResult = "1	11\n" +
+						"2	11\n" +
+						"3	11\n" +
+						"4	11\n" +
+						"5	11";
+	}
+
+	@Test
+	public void testIterationConfiguration() throws Exception {
+
+		/*
+		 * Test name, parallelism and solutionSetUnmanaged parameters
+		 */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		VertexCentricIteration<Long, Long, Long, Long> iteration = VertexCentricIteration
+				.withEdges(TestGraphUtils.getLongLongEdgeData(env), new DummyUpdateFunction(), 
+						new DummyMessageFunction(), 10);
+		
+		IterationConfiguration parameters = new IterationConfiguration();
+		parameters.setName("gelly iteration");
+		parameters.setParallelism(2);
+		parameters.setSolutionSetUnmanagedMemory(true);
+		
+		iteration.configure(parameters);
+		
+		Assert.assertEquals("gelly iteration", iteration.getIterationConfiguration().getName(""));
+		Assert.assertEquals(2, iteration.getIterationConfiguration().getParallelism());
+		Assert.assertEquals(true, iteration.getIterationConfiguration().isSolutionSetUnmanagedMemory());
+
+		DataSet<Vertex<Long, Long>> result = TestGraphUtils.getLongLongVertexData(env).runOperation(iteration);
+		
+		result.map(new VertexToTuple2Map<Long, Long>()).writeAsCsv(resultPath, "\n", "\t");
+		env.execute();
+		expectedResult = "1	11\n" +
+						"2	12\n" +
+						"3	13\n" +
+						"4	14\n" +
+						"5	15";
+	}
+
+	@SuppressWarnings("serial")
+	public static final class UpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
+
+		LongSumAggregator aggregator = new LongSumAggregator();
+
+		@Override
+		public void preSuperstep() {
+			
+			// test bcast variable
+			@SuppressWarnings("unchecked")
+			List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("updateBcastSet");
+			Assert.assertEquals(1, bcastSet.get(0));
+			Assert.assertEquals(2, bcastSet.get(1));
+			Assert.assertEquals(3, bcastSet.get(2));
+			
+			// test aggregator
+			aggregator = getIterationAggregator("superstepAggregator");
+		}
+
+		@Override
+		public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
+			long superstep = getSuperstepNumber();
+			aggregator.aggregate(superstep);
+			setNewVertexValue(vertexValue + 1);
+		}
+	}
+	
+	@SuppressWarnings("serial")
+	public static final class MessageFunction extends MessagingFunction<Long, Long, Long, Long> {
+
+		@Override
+		public void preSuperstep() {
+			
+			// test bcast variable
+			@SuppressWarnings("unchecked")
+			List<Tuple1<Integer>> bcastSet = (List<Tuple1<Integer>>)(List<?>)getBroadcastSet("messagingBcastSet");
+			Assert.assertEquals(4, bcastSet.get(0));
+			Assert.assertEquals(5, bcastSet.get(1));
+			Assert.assertEquals(6, bcastSet.get(2));
+			
+			// test aggregator
+			if (getSuperstepNumber() == 2) {
+				long aggrValue = ((LongValue)getPreviousIterationAggregate("superstepAggregator")).getValue();
+				Assert.assertEquals(5, aggrValue);
+			}
+		}
+
+		@Override
+		public void sendMessages(Long vertexId, Long vertexValue) {
+			//send message to keep vertices active
+			sendMessageToAllNeighbors(vertexValue);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class DummyUpdateFunction extends VertexUpdateFunction<Long, Long, Long> {
+
+		@Override
+		public void updateVertex(Long vertexKey, Long vertexValue, MessageIterator<Long> inMessages) {
+			setNewVertexValue(vertexValue + 1);
+		}
+	}
+	
+	@SuppressWarnings("serial")
+	public static final class DummyMessageFunction extends MessagingFunction<Long, Long, Long, Long> {
+
+		@Override
+		public void sendMessages(Long vertexId, Long vertexValue) {
+			//send message to keep vertices active
+			sendMessageToAllNeighbors(vertexValue);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class AssignOneMapper implements MapFunction<Vertex<Long, Long>, Long> {
+
+		public Long map(Vertex<Long, Long> value) {
+			return 1l;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e98bd853/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java
index 8fb9a11..380e027 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/VertexCentricConnectedComponentsITCase.java
@@ -30,7 +30,6 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 import org.apache.flink.test.testdata.ConnectedComponentsData;
 import org.apache.flink.test.util.JavaProgramTestBase;
@@ -65,8 +64,7 @@ public class VertexCentricConnectedComponentsITCase extends JavaProgramTestBase
 		DataSet<Vertex<Long, Long>> initialVertices = vertexIds.map(new IdAssigner());
 		Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env); 
 		
-		VertexCentricIteration<Long, Long, Long, NullValue> iteration = graph.createVertexCentricIteration(new CCUpdater(), new CCMessager(), 100);
-		Graph<Long, Long, NullValue> result = graph.runVertexCentricIteration(iteration);
+		Graph<Long, Long, NullValue> result = graph.runVertexCentricIteration(new CCUpdater(), new CCMessager(), 100);
 		
 		result.getVertices().writeAsCsv(resultPath, "\n", " ");
 		env.execute();


[3/4] flink git commit: [FLINK-1741] [gelly] Adds Jaccard Similarity Metric Example

Posted by va...@apache.org.
[FLINK-1741] [gelly] Adds Jaccard Similarity Metric Example

This closes #544


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e281e4d6
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e281e4d6
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e281e4d6

Branch: refs/heads/master
Commit: e281e4d6fc22e498f56d88b0f661972345bf0e55
Parents: 48713a8
Author: andralungu <lu...@gmail.com>
Authored: Mon Mar 30 14:50:09 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Fri Apr 10 11:04:19 2015 +0300

----------------------------------------------------------------------
 .../JaccardSimilarityMeasureExample.java        | 212 +++++++++++++++++++
 .../utils/JaccardSimilarityMeasureData.java     |  58 +++++
 .../JaccardSimilarityMeasureExampleITCase.java  |  74 +++++++
 3 files changed, 344 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/e281e4d6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
new file mode 100644
index 0000000..c81aeb3
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/JaccardSimilarityMeasureExample.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example;
+
+import org.apache.flink.api.common.ProgramDescription;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.EdgesFunction;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
+import org.apache.flink.types.NullValue;
+
+import java.util.HashSet;
+
+/**
+ * Given a directed, unweighted graph, return a weighted graph where the edge values are equal
+ * to the Jaccard similarity coefficient - the number of common neighbors divided by the the size
+ * of the union of neighbor sets - for the src and target vertices.
+ *
+ * <p>
+ * Input files are plain text files and must be formatted as follows:
+ * <br>
+ * 	Edges are represented by pairs of srcVertexId, trgVertexId separated by tabs.
+ * 	Edges themselves are separated by newlines.
+ * 	For example: <code>1	2\n1	3\n</code> defines two edges 1-2 and 1-3.
+ * </p>
+ *
+ * Usage <code> JaccardSimilarityMeasureExample &lt;edge path&gt; &lt;result path&gt;</code><br>
+ * If no parameters are provided, the program is run with default data from
+ * {@link org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData}
+ */
+@SuppressWarnings("serial")
+public class JaccardSimilarityMeasureExample implements ProgramDescription {
+
+	public static void main(String [] args) throws Exception {
+
+		if(!parseParameters(args)) {
+			return;
+		}
+
+		ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+
+		DataSet<Edge<Long, Double>> edges = getEdgesDataSet(env);
+
+		Graph<Long, NullValue, Double> graph = Graph.fromDataSet(edges, env);
+
+		DataSet<Vertex<Long, HashSet<Long>>> verticesWithNeighbors =
+				graph.reduceOnEdges(new GatherNeighbors(), EdgeDirection.ALL);
+
+		Graph<Long, HashSet<Long>, Double> graphWithVertexValues = Graph.fromDataSet(verticesWithNeighbors, edges, env);
+
+		// the edge value will be the Jaccard similarity coefficient(number of common neighbors/ all neighbors)
+		DataSet<Tuple3<Long, Long, Double>> edgesWithJaccardWeight = graphWithVertexValues.getTriplets()
+				.map(new WeighEdgesMapper());
+
+		DataSet<Edge<Long, Double>> result = graphWithVertexValues.joinWithEdges(edgesWithJaccardWeight,
+				new MapFunction<Tuple2<Double, Double>, Double>() {
+
+					@Override
+					public Double map(Tuple2<Double, Double> value) throws Exception {
+						return value.f1;
+					}
+				}).getEdges();
+
+		// emit result
+		if (fileOutput) {
+			result.writeAsCsv(outputPath, "\n", ",");
+		} else {
+			result.print();
+		}
+
+		env.execute("Executing Jaccard Similarity Measure");
+	}
+
+	@Override
+	public String getDescription() {
+		return "Vertex Jaccard Similarity Measure";
+	}
+
+	/**
+	 * Each vertex will have a HashSet containing its neighbor ids as value.
+	 */
+	private static final class GatherNeighbors implements EdgesFunction<Long, Double, Vertex<Long, HashSet<Long>>> {
+
+		@Override
+		public Vertex<Long, HashSet<Long>> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges) throws Exception {
+
+			HashSet<Long> neighborsHashSet = new HashSet<Long>();
+			long vertexId = -1;
+
+			for(Tuple2<Long, Edge<Long, Double>> edge : edges) {
+				neighborsHashSet.add(getNeighborID(edge));
+				vertexId = edge.f0;
+			}
+			return new Vertex<Long, HashSet<Long>>(vertexId, neighborsHashSet);
+		}
+	}
+
+	/**
+	 * The edge weight will be the Jaccard coefficient, which is computed as follows:
+	 *
+	 * Consider the edge x-y
+	 * We denote by sizeX and sizeY, the neighbors hash set size of x and y respectively.
+	 * sizeX+sizeY = union + intersection of neighborhoods
+	 * size(hashSetX.addAll(hashSetY)).distinct = union of neighborhoods
+	 * The intersection can then be deduced.
+	 *
+	 * The Jaccard similarity coefficient is then, the intersection/union.
+	 */
+	private static class WeighEdgesMapper implements MapFunction<Triplet<Long, HashSet<Long>, Double>,
+			Tuple3<Long, Long, Double>> {
+
+		@Override
+		public Tuple3<Long, Long, Double> map(Triplet<Long, HashSet<Long>, Double> triplet)
+				throws Exception {
+
+			Vertex<Long, HashSet<Long>> source = triplet.getSrcVertex();
+			Vertex<Long, HashSet<Long>> target = triplet.getTrgVertex();
+
+			long unionPlusIntersection = source.getValue().size() + target.getValue().size();
+			// within a HashSet, all elements are distinct
+			source.getValue().addAll(target.getValue());
+			// the source value contains the union
+			long union = source.getValue().size();
+			long intersection = unionPlusIntersection - union;
+
+			return new Tuple3<Long, Long, Double>(source.getId(), target.getId(), (double) intersection/union);
+		}
+	}
+
+	/**
+	 * Helper method that extracts the neighborId given an edge.
+	 * @param edge
+	 * @return
+	 */
+	private static Long getNeighborID(Tuple2<Long, Edge<Long, Double>> edge) {
+		if(edge.f1.getSource() == edge.f0) {
+			return edge.f1.getTarget();
+		} else {
+			return edge.f1.getSource();
+		}
+	}
+
+	// *************************************************************************
+	// UTIL METHODS
+	// *************************************************************************
+
+	private static boolean fileOutput = false;
+	private static String edgeInputPath = null;
+	private static String outputPath = null;
+
+	private static boolean parseParameters(String [] args) {
+		if(args.length > 0) {
+			if(args.length != 2) {
+				System.err.println("Usage JaccardSimilarityMeasureExample <edge path> <output path>");
+				return false;
+			}
+
+			fileOutput = true;
+			edgeInputPath = args[0];
+			outputPath = args[1];
+		} else {
+			System.out.println("Executing JaccardSimilarityMeasure example with default parameters and built-in default data.");
+			System.out.println("Provide parameters to read input data from files.");
+			System.out.println("Usage JaccardSimilarityMeasureExample <edge path> <output path>");
+		}
+
+		return true;
+	}
+
+	private static DataSet<Edge<Long, Double>> getEdgesDataSet(ExecutionEnvironment env) {
+
+		if(fileOutput) {
+			return env.readCsvFile(edgeInputPath)
+					.ignoreComments("#")
+					.fieldDelimiter("\t")
+					.lineDelimiter("\n")
+					.types(Long.class, Long.class)
+					.map(new MapFunction<Tuple2<Long, Long>, Edge<Long, Double>>() {
+						@Override
+						public Edge<Long, Double> map(Tuple2<Long, Long> tuple2) throws Exception {
+							return new Edge<Long, Double>(tuple2.f0, tuple2.f1, new Double(0));
+						}
+					});
+		} else {
+			return JaccardSimilarityMeasureData.getDefaultEdgeDataSet(env);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e281e4d6/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java
new file mode 100644
index 0000000..7564b95
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/utils/JaccardSimilarityMeasureData.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.example.utils;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Provides the default data sets used for the Jaccard Similarity Measure example program.
+ * If no parameters are given to the program, the default data sets are used.
+ */
+public class JaccardSimilarityMeasureData {
+
+	public static final String EDGES = "1	2\n" + "1	3\n" + "1	4\n" + "1	5\n" + "2	3\n" + "2	4\n" +
+			"2	5\n" + "3	4\n" + "3	5\n" + "4	5";
+
+	public static DataSet<Edge<Long, Double>> getDefaultEdgeDataSet(ExecutionEnvironment env) {
+
+		List<Edge<Long, Double>> edges = new ArrayList<Edge<Long, Double>>();
+		edges.add(new Edge<Long, Double>(1L, 2L, new Double(0)));
+		edges.add(new Edge<Long, Double>(1L, 3L, new Double(0)));
+		edges.add(new Edge<Long, Double>(1L, 4L, new Double(0)));
+		edges.add(new Edge<Long, Double>(1L, 5L, new Double(0)));
+		edges.add(new Edge<Long, Double>(2L, 3L, new Double(0)));
+		edges.add(new Edge<Long, Double>(2L, 4L, new Double(0)));
+		edges.add(new Edge<Long, Double>(2L, 5L, new Double(0)));
+		edges.add(new Edge<Long, Double>(3L, 4L, new Double(0)));
+		edges.add(new Edge<Long, Double>(3L, 5L, new Double(0)));
+		edges.add(new Edge<Long, Double>(4L, 5L, new Double(0)));
+
+		return env.fromCollection(edges);
+	}
+
+	public static final String JACCARD_EDGES = "1,2,0.6\n" + "1,3,0.6\n" + "1,4,0.6\n" + "1,5,0.6\n" +
+			"2,3,0.6\n" + "2,4,0.6\n" + "2,5,0.6\n" + "3,4,0.6\n" + "3,5,0.6\n" + "4,5,0.6";
+
+	private JaccardSimilarityMeasureData() {}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/e281e4d6/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java
new file mode 100644
index 0000000..7269ed7
--- /dev/null
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/example/JaccardSimilarityMeasureExampleITCase.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.test.example;
+
+import com.google.common.base.Charsets;
+import com.google.common.io.Files;
+import org.apache.flink.graph.example.JaccardSimilarityMeasureExample;
+import org.apache.flink.graph.example.utils.JaccardSimilarityMeasureData;
+import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.io.File;
+
+@RunWith(Parameterized.class)
+public class JaccardSimilarityMeasureExampleITCase extends MultipleProgramsTestBase {
+
+	private String edgesPath;
+
+	private String debugResultPath;
+
+	private String resultPath;
+
+	private String expected;
+
+	@Rule
+	public TemporaryFolder tempFolder = new TemporaryFolder();
+
+	public JaccardSimilarityMeasureExampleITCase(TestExecutionMode mode) {
+		super(mode);
+	}
+
+	@Before
+	public void before() throws Exception {
+		resultPath = tempFolder.newFile().toURI().toString();
+
+		File edgesFile = tempFolder.newFile();
+		Files.write(JaccardSimilarityMeasureData.EDGES, edgesFile, Charsets.UTF_8);
+
+		edgesPath = edgesFile.toURI().toString();
+	}
+
+	@Test
+	public void testJaccardSimilarityMeasureExample() throws Exception {
+		JaccardSimilarityMeasureExample.main(new String[]{edgesPath, resultPath});
+		expected = JaccardSimilarityMeasureData.JACCARD_EDGES;
+	}
+
+	@After
+	public void after() throws Exception {
+		compareResultsByLinesInMemory(expected, resultPath);
+	}
+}


[4/4] flink git commit: [gelly] minor javadoc correction: the Graph is always directed.

Posted by va...@apache.org.
[gelly] minor javadoc correction: the Graph is always directed.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/c518df94
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/c518df94
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/c518df94

Branch: refs/heads/master
Commit: c518df944fd47ad0d0fda1fd6aaa7f8fac5b4b18
Parents: e281e4d
Author: vasia <va...@apache.org>
Authored: Fri Apr 10 11:25:59 2015 +0300
Committer: vasia <va...@apache.org>
Committed: Fri Apr 10 11:26:38 2015 +0300

----------------------------------------------------------------------
 .../flink-gelly/src/main/java/org/apache/flink/graph/Graph.java | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/c518df94/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index c84eb12..62173e3 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -68,7 +68,7 @@ import org.apache.flink.types.NullValue;
  * @see org.apache.flink.graph.Vertex
  * 
  * @param <K> the key type for edge and vertex identifiers
- * @param <VV> the value type for vertexes
+ * @param <VV> the value type for vertices
  * @param <EV> the value type for edges
  */
 @SuppressWarnings("serial")
@@ -79,8 +79,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	private final DataSet<Edge<K, EV>> edges;
 
 	/**
-	 * Creates a graph from two DataSets: vertices and edges and allow setting
-	 * the undirected property
+	 * Creates a graph from two DataSets: vertices and edges
 	 * 
 	 * @param vertices a DataSet of vertices.
 	 * @param edges a DataSet of edges.


[2/4] flink git commit: [FLINK-1694] [gelly] fixed typo and a small code simplification

Posted by va...@apache.org.
[FLINK-1694] [gelly] fixed typo and a small code simplification

This closes #547


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/48713a8a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/48713a8a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/48713a8a

Branch: refs/heads/master
Commit: 48713a8a8f328d3880c823a754b9ff6abc618b2a
Parents: e98bd85
Author: vasia <va...@gmail.com>
Authored: Thu Apr 9 22:04:55 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Fri Apr 10 10:54:46 2015 +0300

----------------------------------------------------------------------
 docs/gelly_guide.md                                          | 2 +-
 .../src/main/java/org/apache/flink/graph/Graph.java          | 8 ++------
 2 files changed, 3 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/48713a8a/docs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/gelly_guide.md b/docs/gelly_guide.md
index 203cffa..cc85296 100644
--- a/docs/gelly_guide.md
+++ b/docs/gelly_guide.md
@@ -373,7 +373,7 @@ and can be specified using the `setName()` method.
 
 * <strong>Parallelism</strong>: The parallelism for the iteration. It can be set using the `setParallelism()` method.	
 
-* <strong>Solution set in unmanaged memory</strong>: Defines whether the solution set is kept in managed memory (Flink's internal way of keeping object in serialized form) or as a simple object map. By default, the solution set runs in managed memory. This property can be set using the `setSolutionSetUnmanagedMemory()` method.
+* <strong>Solution set in unmanaged memory</strong>: Defines whether the solution set is kept in managed memory (Flink's internal way of keeping objects in serialized form) or as a simple object map. By default, the solution set runs in managed memory. This property can be set using the `setSolutionSetUnmanagedMemory()` method.
 
 * <strong>Aggregators</strong>: Iteration aggregators can be registered using the `registerAggregator()` method. An iteration aggregator combines
 all aggregates globally once per superstep and makes them available in the next superstep. Registered aggregators can be accessed inside the user-defined `VertexUpdateFunction` and `MessagingFunction`.

http://git-wip-us.apache.org/repos/asf/flink/blob/48713a8a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 8280ba9..c84eb12 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -1165,12 +1165,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 			MessagingFunction<K, VV, M, EV> messagingFunction,
 			int maximumNumberOfIterations) {
 
-		VertexCentricIteration<K, VV, M, EV> iteration = VertexCentricIteration.withEdges(
-				edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations);
-
-		DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
-
-		return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
+		return this.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
+			maximumNumberOfIterations, null);
 	}
 
 	/**