You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:55 UTC

[17/24] flink git commit: [FLINK-2833] [gelly] create a flink-libraries module and move gelly there

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
new file mode 100644
index 0000000..afd4ffd
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricConfiguration.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.IterationConfiguration;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * A VertexCentricConfiguration object can be used to set the iteration name and
+ * degree of parallelism, to register aggregators and use broadcast sets in
+ * the {@link org.apache.flink.graph.spargel.VertexUpdateFunction} and {@link org.apache.flink.graph.spargel.MessagingFunction}
+ *
+ * The VertexCentricConfiguration object is passed as an argument to
+ * {@link org.apache.flink.graph.Graph#runVertexCentricIteration (
+ * org.apache.flink.graph.spargel.VertexUpdateFunction, org.apache.flink.graph.spargel.MessagingFunction, int,
+ * VertexCentricConfiguration)}.
+ */
+public class VertexCentricConfiguration extends IterationConfiguration {
+
+	/** the broadcast variables for the update function **/
+	private List<Tuple2<String, DataSet<?>>> bcVarsUpdate = new ArrayList<Tuple2<String,DataSet<?>>>();
+
+	/** the broadcast variables for the messaging function **/
+	private List<Tuple2<String, DataSet<?>>> bcVarsMessaging = new ArrayList<Tuple2<String,DataSet<?>>>();
+
+	/** flag that defines whether the degrees option is set **/
+	private boolean optDegrees = false;
+
+	/** the direction in which the messages should be sent **/
+	private EdgeDirection direction = EdgeDirection.OUT;
+
+	public VertexCentricConfiguration() {}
+
+	/**
+	 * Adds a data set as a broadcast set to the messaging function.
+	 *
+	 * @param name The name under which the broadcast data is available in the messaging function.
+	 * @param data The data set to be broadcasted.
+	 */
+	public void addBroadcastSetForMessagingFunction(String name, DataSet<?> data) {
+		this.bcVarsMessaging.add(new Tuple2<String, DataSet<?>>(name, data));
+	}
+
+	/**
+	 * Adds a data set as a broadcast set to the vertex update function.
+	 *
+	 * @param name The name under which the broadcast data is available in the vertex update function.
+	 * @param data The data set to be broadcasted.
+	 */
+	public void addBroadcastSetForUpdateFunction(String name, DataSet<?> data) {
+		this.bcVarsUpdate.add(new Tuple2<String, DataSet<?>>(name, data));
+	}
+
+	/**
+	 * Get the broadcast variables of the VertexUpdateFunction.
+	 *
+	 * @return a List of Tuple2, where the first field is the broadcast variable name
+	 * and the second field is the broadcast DataSet.
+	 */
+	public List<Tuple2<String, DataSet<?>>> getUpdateBcastVars() {
+		return this.bcVarsUpdate;
+	}
+
+	/**
+	 * Get the broadcast variables of the MessagingFunction.
+	 *
+	 * @return a List of Tuple2, where the first field is the broadcast variable name
+	 * and the second field is the broadcast DataSet.
+	 */
+	public List<Tuple2<String, DataSet<?>>> getMessagingBcastVars() {
+		return this.bcVarsMessaging;
+	}
+
+	/**
+	 * Gets whether the degrees option is set.
+	 * By default, the degrees option is not set.
+	 *
+	 * @return True, if the degree option is set, false otherwise.
+	 */
+	public boolean isOptDegrees() {
+		return optDegrees;
+	}
+
+	/**
+	 * Sets the degree option.
+	 * By default, the degrees option is not set.
+	 *
+	 * @param optDegrees True, to set this option, false otherwise.
+	 */
+	public void setOptDegrees(boolean optDegrees) {
+		this.optDegrees = optDegrees;
+	}
+
+	/**
+	 * Gets the direction in which messages are sent in the MessagingFunction.
+	 * By default the messaging direction is OUT.
+	 *
+	 * @return an EdgeDirection, which can be either IN, OUT or ALL.
+	 */
+	public EdgeDirection getDirection() {
+		return direction;
+	}
+
+	/**
+	 * Sets the direction in which messages are sent in the MessagingFunction.
+	 * By default the messaging direction is OUT.
+	 *
+	 * @param direction - IN, OUT or ALL
+	 */
+	public void setDirection(EdgeDirection direction) {
+		this.direction = direction;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
new file mode 100644
index 0000000..b3a470e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexCentricIteration.java
@@ -0,0 +1,686 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import java.util.Iterator;
+import java.util.Map;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.common.functions.RichCoGroupFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.operators.CoGroupOperator;
+import org.apache.flink.api.java.operators.CustomUnaryOperation;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.Collector;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class represents iterative graph computations, programmed in a vertex-centric perspective.
+ * It is a special case of <i>Bulk Synchronous Parallel<i> computation. The paradigm has also been
+ * implemented by Google's <i>Pregel</i> system and by <i>Apache Giraph</i>.
+ * <p>
+ * Vertex centric algorithms operate on graphs, which are defined through vertices and edges. The 
+ * algorithms send messages along the edges and update the state of vertices based on
+ * the old state and the incoming messages. All vertices have an initial state.
+ * The computation terminates once no vertex updates it state any more.
+ * Additionally, a maximum number of iterations (supersteps) may be specified.
+ * <p>
+ * The computation is here represented by two functions:
+ * <ul>
+ *   <li>The {@link VertexUpdateFunction} receives incoming messages and may updates the state for
+ *   the vertex. If a state is updated, messages are sent from this vertex. Initially, all vertices are
+ *   considered updated.</li>
+ *   <li>The {@link MessagingFunction} takes the new vertex state and sends messages along the outgoing
+ *   edges of the vertex. The outgoing edges may optionally have an associated value, such as a weight.</li>
+ * </ul>
+ * <p>
+ *
+ * Vertex-centric graph iterations are are run by calling
+ * {@link Graph#runVertexCentricIteration(VertexUpdateFunction, MessagingFunction, int)}.
+ *
+ * @param <K> The type of the vertex key (the vertex identifier).
+ * @param <VV> The type of the vertex value (the state of the vertex).
+ * @param <Message> The type of the message sent between vertices along the edges.
+ * @param <EV> The type of the values that are associated with the edges.
+ */
+public class VertexCentricIteration<K, VV, Message, EV> 
+	implements CustomUnaryOperation<Vertex<K, VV>, Vertex<K, VV>>
+{
+	private final VertexUpdateFunction<K, VV, Message> updateFunction;
+
+	private final MessagingFunction<K, VV, Message, EV> messagingFunction;
+	
+	private final DataSet<Edge<K, EV>> edgesWithValue;
+	
+	private final int maximumNumberOfIterations;
+	
+	private final TypeInformation<Message> messageType;
+	
+	private DataSet<Vertex<K, VV>> initialVertices;
+
+	private VertexCentricConfiguration configuration;
+
+	// ----------------------------------------------------------------------------------
+	
+	private VertexCentricIteration(VertexUpdateFunction<K, VV, Message> uf,
+			MessagingFunction<K, VV, Message, EV> mf,
+			DataSet<Edge<K, EV>> edgesWithValue, 
+			int maximumNumberOfIterations)
+	{
+		Preconditions.checkNotNull(uf);
+		Preconditions.checkNotNull(mf);
+		Preconditions.checkNotNull(edgesWithValue);
+		Preconditions.checkArgument(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
+
+		this.updateFunction = uf;
+		this.messagingFunction = mf;
+		this.edgesWithValue = edgesWithValue;
+		this.maximumNumberOfIterations = maximumNumberOfIterations;		
+		this.messageType = getMessageType(mf);
+	}
+	
+	private TypeInformation<Message> getMessageType(MessagingFunction<K, VV, Message, EV> mf) {
+		return TypeExtractor.createTypeInfo(MessagingFunction.class, mf.getClass(), 2, null, null);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  Custom Operator behavior
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Sets the input data set for this operator. In the case of this operator this input data set represents
+	 * the set of vertices with their initial state.
+	 * 
+	 * @param inputData The input data set, which in the case of this operator represents the set of
+	 *                  vertices with their initial state.
+	 * 
+	 * @see org.apache.flink.api.java.operators.CustomUnaryOperation#setInput(org.apache.flink.api.java.DataSet)
+	 */
+	@Override
+	public void setInput(DataSet<Vertex<K, VV>> inputData) {
+		this.initialVertices = inputData;
+	}
+	
+	/**
+	 * Creates the operator that represents this vertex-centric graph computation.
+	 * 
+	 * @return The operator that represents this vertex-centric graph computation.
+	 */
+	@Override
+	public DataSet<Vertex<K, VV>> createResult() {
+		if (this.initialVertices == null) {
+			throw new IllegalStateException("The input data set has not been set.");
+		}
+
+		// prepare some type information
+		TypeInformation<K> keyType = ((TupleTypeInfo<?>) initialVertices.getType()).getTypeAt(0);
+		TypeInformation<Tuple2<K, Message>> messageTypeInfo = new TupleTypeInfo<Tuple2<K,Message>>(keyType, messageType);
+
+		// create a graph
+		Graph<K, VV, EV> graph =
+				Graph.fromDataSet(initialVertices, edgesWithValue, initialVertices.getExecutionEnvironment());
+
+		// check whether the numVertices option is set and, if so, compute the total number of vertices
+		// and set it within the messaging and update functions
+
+		if (this.configuration != null && this.configuration.isOptNumVertices()) {
+			try {
+				long numberOfVertices = graph.numberOfVertices();
+				messagingFunction.setNumberOfVertices(numberOfVertices);
+				updateFunction.setNumberOfVertices(numberOfVertices);
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+
+		if(this.configuration != null) {
+			messagingFunction.setDirection(this.configuration.getDirection());
+		} else {
+			messagingFunction.setDirection(EdgeDirection.OUT);
+		}
+
+		// retrieve the direction in which the updates are made and in which the messages are sent
+		EdgeDirection messagingDirection = messagingFunction.getDirection();
+
+		// check whether the degrees option is set and, if so, compute the in and the out degrees and
+		// add them to the vertex value
+		if(this.configuration != null && this.configuration.isOptDegrees()) {
+			return createResultVerticesWithDegrees(graph, messagingDirection, messageTypeInfo);
+		} else {
+			return createResultSimpleVertex(messagingDirection, messageTypeInfo);
+		}
+	}
+
+	/**
+	 * Creates a new vertex-centric iteration operator for graphs where the edges are associated with a value (such as
+	 * a weight or distance).
+	 * 
+	 * @param edgesWithValue The data set containing edges.
+	 * @param uf The function that updates the state of the vertices from the incoming messages.
+	 * @param mf The function that turns changed vertex states into messages along the edges.
+	 * 
+	 * @param <K> The type of the vertex key (the vertex identifier).
+	 * @param <VV> The type of the vertex value (the state of the vertex).
+	 * @param <Message> The type of the message sent between vertices along the edges.
+	 * @param <EV> The type of the values that are associated with the edges.
+	 * 
+	 * @return An in stance of the vertex-centric graph computation operator.
+	 */
+	public static final <K, VV, Message, EV>
+			VertexCentricIteration<K, VV, Message, EV> withEdges(
+					DataSet<Edge<K, EV>> edgesWithValue,
+					VertexUpdateFunction<K, VV, Message> uf,
+					MessagingFunction<K, VV, Message, EV> mf,
+					int maximumNumberOfIterations)
+	{
+		return new VertexCentricIteration<K, VV, Message, EV>(uf, mf, edgesWithValue, maximumNumberOfIterations);
+	}
+
+	/**
+	 * Configures this vertex-centric iteration with the provided parameters.
+	 *
+	 * @param parameters the configuration parameters
+	 */
+	public void configure(VertexCentricConfiguration parameters) {
+		this.configuration = parameters;
+	}
+
+	/**
+	 * @return the configuration parameters of this vertex-centric iteration
+	 */
+	public VertexCentricConfiguration getIterationConfiguration() {
+		return this.configuration;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Wrapping UDFs
+	// --------------------------------------------------------------------------------------------
+
+	private static abstract class VertexUpdateUdf<K, VVWithDegrees, Message> extends RichCoGroupFunction<
+		Tuple2<K, Message>, Vertex<K, VVWithDegrees>, Vertex<K, VVWithDegrees>>
+		implements ResultTypeQueryable<Vertex<K, VVWithDegrees>>
+	{
+		private static final long serialVersionUID = 1L;
+		
+		final VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction;
+
+		final MessageIterator<Message> messageIter = new MessageIterator<Message>();
+		
+		private transient TypeInformation<Vertex<K, VVWithDegrees>> resultType;
+		
+		
+		private VertexUpdateUdf(VertexUpdateFunction<K, VVWithDegrees, Message> vertexUpdateFunction,
+				TypeInformation<Vertex<K, VVWithDegrees>> resultType)
+		{
+			this.vertexUpdateFunction = vertexUpdateFunction;
+			this.resultType = resultType;
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+				this.vertexUpdateFunction.init(getIterationRuntimeContext());
+			}
+			this.vertexUpdateFunction.preSuperstep();
+		}
+		
+		@Override
+		public void close() throws Exception {
+			this.vertexUpdateFunction.postSuperstep();
+		}
+
+		@Override
+		public TypeInformation<Vertex<K, VVWithDegrees>> getProducedType() {
+			return this.resultType;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class VertexUpdateUdfSimpleVV<K, VV, Message> extends VertexUpdateUdf<K, VV, Message> {
+
+		private VertexUpdateUdfSimpleVV(VertexUpdateFunction<K, VV, Message> vertexUpdateFunction, TypeInformation<Vertex<K, VV>> resultType) {
+			super(vertexUpdateFunction, resultType);
+		}
+
+		@Override
+		public void coGroup(Iterable<Tuple2<K, Message>> messages,
+							Iterable<Vertex<K, VV>> vertex,
+							Collector<Vertex<K, VV>> out) throws Exception {
+			final Iterator<Vertex<K, VV>> vertexIter = vertex.iterator();
+
+			if (vertexIter.hasNext()) {
+				Vertex<K, VV> vertexState = vertexIter.next();
+
+				@SuppressWarnings("unchecked")
+				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
+				messageIter.setSource(downcastIter);
+
+				vertexUpdateFunction.setOutput(vertexState, out);
+				vertexUpdateFunction.updateVertex(vertexState, messageIter);
+			}
+			else {
+				final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
+				if (messageIter.hasNext()) {
+					String message = "Target vertex does not exist!.";
+					try {
+						Tuple2<K, Message> next = messageIter.next();
+						message = "Target vertex '" + next.f0 + "' does not exist!.";
+					} catch (Throwable t) {}
+					throw new Exception(message);
+				} else {
+					throw new Exception();
+				}
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> extends VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> {
+
+		private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, Tuple3<VV, Long, Long>, Message> vertexUpdateFunction,
+				TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> resultType) {
+			super(vertexUpdateFunction, resultType);
+		}
+		
+		@Override
+		public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> vertex,
+							Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
+
+			final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> vertexIter = vertex.iterator();
+		
+			if (vertexIter.hasNext()) {
+				Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = vertexIter.next();
+		
+				@SuppressWarnings("unchecked")
+				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
+				messageIter.setSource(downcastIter);
+
+				vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1);
+				vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2);
+
+				vertexUpdateFunction.setOutputWithDegrees(vertexWithDegrees, out);
+				vertexUpdateFunction.updateVertexFromVertexCentricIteration(vertexWithDegrees, messageIter);
+			}
+			else {
+				final Iterator<Tuple2<K, Message>> messageIter = messages.iterator();
+				if (messageIter.hasNext()) {
+					String message = "Target vertex does not exist!.";
+					try {
+						Tuple2<K, Message> next = messageIter.next();
+						message = "Target vertex '" + next.f0 + "' does not exist!.";
+					} catch (Throwable t) {}
+					throw new Exception(message);
+				} else {
+					throw new Exception();
+				}
+			}
+		}
+	}
+
+	/*
+	 * UDF that encapsulates the message sending function for graphs where the edges have an associated value.
+	 */
+	private static abstract class MessagingUdfWithEdgeValues<K, VVWithDegrees, VV, Message, EV>
+		extends RichCoGroupFunction<Edge<K, EV>, Vertex<K, VVWithDegrees>, Tuple2<K, Message>>
+		implements ResultTypeQueryable<Tuple2<K, Message>>
+	{
+		private static final long serialVersionUID = 1L;
+		
+		final MessagingFunction<K, VV, Message, EV> messagingFunction;
+		
+		private transient TypeInformation<Tuple2<K, Message>> resultType;
+	
+	
+		private MessagingUdfWithEdgeValues(MessagingFunction<K, VV, Message, EV> messagingFunction,
+				TypeInformation<Tuple2<K, Message>> resultType)
+		{
+			this.messagingFunction = messagingFunction;
+			this.resultType = resultType;
+		}
+		
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+				this.messagingFunction.init(getIterationRuntimeContext());
+			}
+			
+			this.messagingFunction.preSuperstep();
+		}
+		
+		@Override
+		public void close() throws Exception {
+			this.messagingFunction.postSuperstep();
+		}
+		
+		@Override
+		public TypeInformation<Tuple2<K, Message>> getProducedType() {
+			return this.resultType;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>
+		extends MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> {
+
+		private MessagingUdfWithEVsSimpleVV(MessagingFunction<K, VV, Message, EV> messagingFunction,
+			TypeInformation<Tuple2<K, Message>> resultType) {
+			super(messagingFunction, resultType);
+		}
+
+		@Override
+		public void coGroup(Iterable<Edge<K, EV>> edges,
+							Iterable<Vertex<K, VV>> state,
+							Collector<Tuple2<K, Message>> out) throws Exception {
+			final Iterator<Vertex<K, VV>> stateIter = state.iterator();
+		
+			if (stateIter.hasNext()) {
+				Vertex<K, VV> newVertexState = stateIter.next();
+				messagingFunction.set((Iterator<?>) edges.iterator(), out);
+				messagingFunction.sendMessages(newVertexState);
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>
+		extends MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> {
+
+		private Vertex<K, VV> nextVertex = new Vertex<K, VV>();
+
+		private MessagingUdfWithEVsVVWithDegrees(MessagingFunction<K, VV, Message, EV> messagingFunction,
+				TypeInformation<Tuple2<K, Message>> resultType) {
+			super(messagingFunction, resultType);
+		}
+
+		@Override
+		public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> state,
+				Collector<Tuple2<K, Message>> out) throws Exception {
+
+			final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> stateIter = state.iterator();
+		
+			if (stateIter.hasNext()) {
+				Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = stateIter.next();
+
+				nextVertex.setField(vertexWithDegrees.f0, 0);
+				nextVertex.setField(vertexWithDegrees.f1.f0, 1);
+
+				messagingFunction.setInDegree(vertexWithDegrees.f1.f1);
+				messagingFunction.setOutDegree(vertexWithDegrees.f1.f2);
+
+				messagingFunction.set((Iterator<?>) edges.iterator(), out);
+				messagingFunction.sendMessages(nextVertex);
+			}
+		}
+	}
+
+
+	// --------------------------------------------------------------------------------------------
+	//  UTIL methods
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Method that builds the messaging function using a coGroup operator for a simple vertex(without
+	 * degrees).
+	 * It afterwards configures the function with a custom name and broadcast variables.
+	 *
+	 * @param iteration
+	 * @param messageTypeInfo
+	 * @param whereArg the argument for the where within the coGroup
+	 * @param equalToArg the argument for the equalTo within the coGroup
+	 * @return the messaging function
+	 */
+	private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunction(
+			DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration,
+			TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
+
+		// build the messaging function (co group)
+		CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
+		MessagingUdfWithEdgeValues<K, VV, VV, Message, EV> messenger =
+				new MessagingUdfWithEVsSimpleVV<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
+
+		messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
+				.equalTo(equalToArg).with(messenger);
+
+		// configure coGroup message function with name and broadcast variables
+		messages = messages.name("Messaging");
+		if(this.configuration != null) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
+				messages = messages.withBroadcastSet(e.f1, e.f0);
+			}
+		}
+
+		return messages;
+	}
+
+	/**
+	 * Method that builds the messaging function using a coGroup operator for a vertex
+	 * containing degree information.
+	 * It afterwards configures the function with a custom name and broadcast variables.
+	 *
+	 * @param iteration
+	 * @param messageTypeInfo
+	 * @param whereArg the argument for the where within the coGroup
+	 * @param equalToArg the argument for the equalTo within the coGroup
+	 * @return the messaging function
+	 */
+	private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunctionVerticesWithDegrees(
+			DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration,
+			TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
+
+		// build the messaging function (co group)
+		CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
+		MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> messenger =
+				new MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
+
+		messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
+				.equalTo(equalToArg).with(messenger);
+
+		// configure coGroup message function with name and broadcast variables
+		messages = messages.name("Messaging");
+
+		if (this.configuration != null) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
+				messages = messages.withBroadcastSet(e.f1, e.f0);
+			}
+		}
+
+		return messages;
+	}
+
+	/**
+	 * Helper method which sets up an iteration with the given vertex value(either simple or with degrees)
+	 *
+	 * @param iteration
+	 */
+
+	private void setUpIteration(DeltaIteration<?, ?> iteration) {
+
+		// set up the iteration operator
+		if (this.configuration != null) {
+
+			iteration.name(this.configuration.getName("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")"));
+			iteration.parallelism(this.configuration.getParallelism());
+			iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
+
+			// register all aggregators
+			for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
+				iteration.registerAggregator(entry.getKey(), entry.getValue());
+			}
+		}
+		else {
+			// no configuration provided; set default name
+			iteration.name("Vertex-centric iteration (" + updateFunction + " | " + messagingFunction + ")");
+		}
+	}
+
+	/**
+	 * Creates the operator that represents this vertex centric graph computation for a simple vertex.
+	 *
+	 * @param messagingDirection
+	 * @param messageTypeInfo
+	 * @return the operator
+	 */
+	private DataSet<Vertex<K, VV>> createResultSimpleVertex(EdgeDirection messagingDirection,
+		TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
+
+		DataSet<Tuple2<K, Message>> messages;
+
+		TypeInformation<Vertex<K, VV>> vertexTypes = initialVertices.getType();
+
+		final DeltaIteration<Vertex<K, VV>,	Vertex<K, VV>> iteration =
+				initialVertices.iterateDelta(initialVertices, this.maximumNumberOfIterations, 0);
+				setUpIteration(iteration);
+
+		switch (messagingDirection) {
+			case IN:
+				messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0);
+				break;
+			case OUT:
+				messages = buildMessagingFunction(iteration, messageTypeInfo, 0, 0);
+				break;
+			case ALL:
+				messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0)
+						.union(buildMessagingFunction(iteration, messageTypeInfo, 0, 0)) ;
+				break;
+			default:
+				throw new IllegalArgumentException("Illegal edge direction");
+		}
+
+		VertexUpdateUdf<K, VV, Message> updateUdf =
+				new VertexUpdateUdfSimpleVV<K, VV, Message>(updateFunction, vertexTypes);
+
+		// build the update function (co group)
+		CoGroupOperator<?, ?, Vertex<K, VV>> updates =
+				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
+
+		configureUpdateFunction(updates);
+
+		return iteration.closeWith(updates, updates);
+	}
+
+	/**
+	 * Creates the operator that represents this vertex centric graph computation for a vertex with in
+	 * and out degrees added to the vertex value.
+	 *
+	 * @param graph
+	 * @param messagingDirection
+	 * @param messageTypeInfo
+	 * @return the operator
+	 */
+	@SuppressWarnings("serial")
+	private DataSet<Vertex<K, VV>> createResultVerticesWithDegrees(Graph<K, VV, EV> graph, EdgeDirection messagingDirection,
+			TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
+
+		DataSet<Tuple2<K, Message>> messages;
+
+		this.updateFunction.setOptDegrees(this.configuration.isOptDegrees());
+
+		DataSet<Tuple2<K, Long>> inDegrees = graph.inDegrees();
+		DataSet<Tuple2<K, Long>> outDegrees = graph.outDegrees();
+
+		DataSet<Tuple3<K, Long, Long>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
+				.with(new FlatJoinFunction<Tuple2<K, Long>, Tuple2<K, Long>, Tuple3<K, Long, Long>>() {
+
+					@Override
+					public void join(Tuple2<K, Long> first, Tuple2<K, Long> second,	Collector<Tuple3<K, Long, Long>> out) {
+						out.collect(new Tuple3<K, Long, Long>(first.f0, first.f1, second.f1));
+					}
+				}).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1");
+
+		DataSet<Vertex<K, Tuple3<VV, Long, Long>>> verticesWithDegrees = initialVertices
+				.join(degrees).where(0).equalTo(0)
+				.with(new FlatJoinFunction<Vertex<K,VV>, Tuple3<K,Long,Long>, Vertex<K, Tuple3<VV, Long, Long>>>() {
+					@Override
+					public void join(Vertex<K, VV> vertex, Tuple3<K, Long, Long> degrees,
+									Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
+
+						out.collect(new Vertex<K, Tuple3<VV, Long, Long>>(vertex.getId(),
+								new Tuple3<VV, Long, Long>(vertex.getValue(), degrees.f1, degrees.f2)));
+					}
+				}).withForwardedFieldsFirst("f0");
+
+		// add type info
+		TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> vertexTypes = verticesWithDegrees.getType();
+
+		final DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>,	Vertex<K, Tuple3<VV, Long, Long>>> iteration =
+				verticesWithDegrees.iterateDelta(verticesWithDegrees, this.maximumNumberOfIterations, 0);
+				setUpIteration(iteration);
+
+		switch (messagingDirection) {
+			case IN:
+				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0);
+				break;
+			case OUT:
+				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0);
+				break;
+			case ALL:
+				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0)
+						.union(buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0)) ;
+				break;
+			default:
+				throw new IllegalArgumentException("Illegal edge direction");
+		}
+
+		@SuppressWarnings({ "unchecked", "rawtypes" })
+		VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> updateUdf =
+				new VertexUpdateUdfVVWithDegrees(updateFunction, vertexTypes);
+
+		// build the update function (co group)
+		CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, Long, Long>>> updates =
+				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
+
+		configureUpdateFunction(updates);
+
+		return iteration.closeWith(updates, updates).map(
+				new MapFunction<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, VV>>() {
+
+					public Vertex<K, VV> map(Vertex<K, Tuple3<VV, Long, Long>> vertex) {
+						return new Vertex<K, VV>(vertex.getId(), vertex.getValue().f0);
+					}
+				});
+	}
+
+	private <VVWithDegree> void configureUpdateFunction(CoGroupOperator<?, ?, Vertex<K, VVWithDegree>> updates) {
+
+		// configure coGroup update function with name and broadcast variables
+		updates = updates.name("Vertex State Updates");
+		if (this.configuration != null) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getUpdateBcastVars()) {
+				updates = updates.withBroadcastSet(e.f1, e.f0);
+			}
+		}
+
+		// let the operator know that we preserve the key field
+		updates.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
new file mode 100644
index 0000000..248925b
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/VertexUpdateFunction.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+/**
+ * This class must be extended by functions that compute the state of the vertex depending on the old state and the
+ * incoming messages. The central method is {@link #updateVertex(Comparable, Object, MessageIterator)}, which is
+ * invoked once per vertex per superstep.
+ * 
+ * <K> The vertex key type.
+ * <VV> The vertex value type.
+ * <Message> The message type.
+ */
+public abstract class VertexUpdateFunction<K, VV, Message> implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	// --------------------------------------------------------------------------------------------
+	//  Attributes that allow vertices to access their in/out degrees and the total number of vertices
+	//  inside an iteration.
+	// --------------------------------------------------------------------------------------------
+
+	private long numberOfVertices = -1L;
+
+	/**
+	 * Retrieves the number of vertices in the graph.
+	 * @return the number of vertices if the {@link IterationConfiguration#setOptNumVertices(boolean)}
+	 * option has been set; -1 otherwise.
+	 */
+	public long getNumberOfVertices() {
+		return numberOfVertices;
+	}
+
+	void setNumberOfVertices(long numberOfVertices) {
+		this.numberOfVertices = numberOfVertices;
+	}
+
+	//---------------------------------------------------------------------------------------------
+
+	private boolean optDegrees;
+
+	boolean isOptDegrees() {
+		return optDegrees;
+	}
+
+	void setOptDegrees(boolean optDegrees) {
+		this.optDegrees = optDegrees;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Public API Methods
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * This method is invoked once per vertex per superstep. It receives the current state of the vertex, as well as
+	 * the incoming messages. It may set a new vertex state via {@link #setNewVertexValue(Object)}. If the vertex
+	 * state is changed, it will trigger the sending of messages via the {@link MessagingFunction}.
+	 * 
+	 * @param vertex The vertex.
+	 * @param inMessages The incoming messages to this vertex.
+	 * 
+	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
+	 */
+	public abstract void updateVertex(Vertex<K, VV> vertex, MessageIterator<Message> inMessages) throws Exception;
+	
+	/**
+	 * This method is executed one per superstep before the vertex update function is invoked for each vertex.
+	 * 
+	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+	 */
+	public void preSuperstep() throws Exception {}
+	
+	/**
+	 * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
+	 * 
+	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+	 */
+	public void postSuperstep() throws Exception {}
+	
+	/**
+	 * Sets the new value of this vertex. Setting a new value triggers the sending of outgoing messages from this vertex.
+	 *
+	 * This should be called at most once per updateVertex.
+	 * 
+	 * @param newValue The new vertex value.
+	 */
+	public void setNewVertexValue(VV newValue) {
+		if(setNewVertexValueCalled) {
+			throw new IllegalStateException("setNewVertexValue should only be called at most once per updateVertex");
+		}
+		setNewVertexValueCalled = true;
+		if(isOptDegrees()) {
+			outValWithDegrees.f1.f0 = newValue;
+			outWithDegrees.collect(outValWithDegrees);
+		} else {
+			outVal.setValue(newValue);
+			out.collect(outVal);
+		}
+	}
+	
+	/**
+	 * Gets the number of the superstep, starting at <tt>1</tt>.
+	 * 
+	 * @return The number of the current superstep.
+	 */
+	public int getSuperstepNumber() {
+		return this.runtimeContext.getSuperstepNumber();
+	}
+	
+	/**
+	 * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
+	 * all aggregates globally once per superstep and makes them available in the next superstep.
+	 * 
+	 * @param name The name of the aggregator.
+	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
+	 */
+	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+		return this.runtimeContext.<T>getIterationAggregator(name);
+	}
+	
+	/**
+	 * Get the aggregated value that an aggregator computed in the previous iteration.
+	 * 
+	 * @param name The name of the aggregator.
+	 * @return The aggregated value of the previous iteration.
+	 */
+	public <T extends Value> T getPreviousIterationAggregate(String name) {
+		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+	}
+	
+	/**
+	 * Gets the broadcast data set registered under the given name. Broadcast data sets
+	 * are available on all parallel instances of a function. They can be registered via
+	 * {@link org.apache.flink.graph.spargel.VertexCentricConfiguration#addBroadcastSetForUpdateFunction(String, org.apache.flink.api.java.DataSet)}.
+	 * 
+	 * @param name The name under which the broadcast set is registered.
+	 * @return The broadcast data set.
+	 */
+	public <T> Collection<T> getBroadcastSet(String name) {
+		return this.runtimeContext.<T>getBroadcastVariable(name);
+	}
+	
+	// --------------------------------------------------------------------------------------------
+	//  internal methods
+	// --------------------------------------------------------------------------------------------
+	
+	private IterationRuntimeContext runtimeContext;
+
+	private Collector<Vertex<K, VV>> out;
+	
+	private Collector<Vertex<K, Tuple3<VV, Long, Long>>> outWithDegrees;
+
+	private Vertex<K, VV> outVal;
+
+	private Vertex<K, Tuple3<VV, Long, Long>> outValWithDegrees;
+
+	private long inDegree = -1;
+
+	private long outDegree = -1;
+
+	private boolean setNewVertexValueCalled;
+
+	void init(IterationRuntimeContext context) {
+		this.runtimeContext = context;
+	}
+
+	void setOutput(Vertex<K, VV> outVal, Collector<Vertex<K, VV>> out) {
+		this.outVal = outVal;
+		this.out = out;
+		setNewVertexValueCalled = false;
+	}
+
+	@SuppressWarnings({ "unchecked", "rawtypes" })
+	<ValueWithDegree> void setOutputWithDegrees(Vertex<K, ValueWithDegree> outVal,
+			Collector out) {
+		this.outValWithDegrees = (Vertex<K, Tuple3<VV, Long, Long>>) outVal;
+		this.outWithDegrees = out;
+		setNewVertexValueCalled = false;
+	}
+
+	/**
+	 * Retrieves the vertex in-degree (number of in-coming edges).
+	 * @return The in-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
+	 * option has been set; -1 otherwise. 
+	 */
+	public long getInDegree() {
+		return inDegree;
+	}
+
+	void setInDegree(long inDegree) {
+		this.inDegree = inDegree;
+	}
+
+	/**
+	 * Retrieve the vertex out-degree (number of out-going edges).
+	 * @return The out-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
+	 * option has been set; -1 otherwise. 
+	 */
+	public long getOutDegree() {
+		return outDegree;
+	}
+
+	void setOutDegree(long outDegree) {
+		this.outDegree = outDegree;
+	}
+
+	/**
+	 * In order to hide the Tuple3(actualValue, inDegree, OutDegree) vertex value from the user,
+	 * another function will be called from {@link org.apache.flink.graph.spargel.VertexCentricIteration}.
+	 *
+	 * This function will retrieve the vertex from the vertexState and will set its degrees, afterwards calling
+	 * the regular updateVertex function.
+	 *
+	 * @param vertexState
+	 * @param inMessages
+	 * @throws Exception
+	 */
+	@SuppressWarnings("unchecked")
+	<VertexWithDegree> void updateVertexFromVertexCentricIteration(Vertex<K, VertexWithDegree> vertexState,
+												MessageIterator<Message> inMessages) throws Exception {
+
+		Vertex<K, VV> vertex = new Vertex<K, VV>(vertexState.f0,
+				((Tuple3<VV, Long, Long>)vertexState.getValue()).f0);
+
+		updateVertex(vertex, inMessages);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
new file mode 100644
index 0000000..0e085b4
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/EdgeToTuple3Map.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+
+@ForwardedFields("f0; f1; f2")
+public class EdgeToTuple3Map<K, EV> implements MapFunction<Edge<K, EV>, Tuple3<K, K, EV>> {
+
+	private static final long serialVersionUID = 1L;
+
+	public Tuple3<K, K, EV> map(Edge<K, EV> edge) {
+		return new Tuple3<K, K, EV>(edge.f0, edge.f1, edge.f2);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
new file mode 100644
index 0000000..2bd4719
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/NullValueEdgeMapper.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.types.NullValue;
+
+public class NullValueEdgeMapper<K, EV> implements	MapFunction<Edge<K, EV>, NullValue> {
+
+	private static final long serialVersionUID = 1L;
+
+	public NullValue map(Edge<K, EV> edge) {
+		return NullValue.getInstance();
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
new file mode 100644
index 0000000..e51362b
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple2ToVertexMap.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+
+@ForwardedFields("f0; f1")
+public class Tuple2ToVertexMap<K, VV> implements MapFunction<Tuple2<K, VV>, Vertex<K, VV>> {
+
+	private static final long serialVersionUID = 1L;
+
+	public Vertex<K, VV> map(Tuple2<K, VV> vertex) {
+		return new Vertex<K, VV>(vertex.f0, vertex.f1);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
new file mode 100644
index 0000000..0db9a51
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/Tuple3ToEdgeMap.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+
+/**
+ * create an Edge DataSetfrom a Tuple3 dataset
+ *
+ * @param <K>
+ * @param <EV>
+ */
+@ForwardedFields("f0; f1; f2")
+public class Tuple3ToEdgeMap<K, EV> implements MapFunction<Tuple3<K, K, EV>, Edge<K, EV>> {
+
+	private static final long serialVersionUID = 1L;
+
+	public Edge<K, EV> map(Tuple3<K, K, EV> tuple) {
+		return new Edge<K, EV>(tuple.f0, tuple.f1, tuple.f2);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
new file mode 100644
index 0000000..04d1f47
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/VertexToTuple2Map.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.utils;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+
+@ForwardedFields("f0; f1")
+public class VertexToTuple2Map<K, VV> implements MapFunction<Vertex<K, VV>, Tuple2<K, VV>> {
+
+	private static final long serialVersionUID = 1L;
+
+	public Tuple2<K, VV> map(Vertex<K, VV> vertex) {
+		return new Tuple2<K, VV>(vertex.f0, vertex.f1);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
new file mode 100644
index 0000000..75b672c
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/GraphValidator.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.validation;
+
+import java.io.Serializable;
+
+import org.apache.flink.graph.Graph;
+
+/**
+ * A utility for defining validation criteria for different types of Graphs.
+ * 
+ * @param <K> the vertex key type
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ */
+@SuppressWarnings("serial")
+public abstract class GraphValidator<K, VV, EV>	implements Serializable {
+
+	public abstract boolean validate(Graph<K, VV, EV> graph) throws Exception;
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
new file mode 100644
index 0000000..33d469b
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/validation/InvalidVertexIdsValidator.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.validation;
+
+import org.apache.flink.api.common.functions.CoGroupFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.Collector;
+
+@SuppressWarnings("serial")
+public class InvalidVertexIdsValidator<K, VV, EV> extends GraphValidator<K, VV, EV> {
+
+	/**
+	 * Checks that the edge set input contains valid vertex Ids, i.e. that they
+	 * also exist in the vertex input set.
+	 * 
+	 * @return a boolean stating whether a graph is valid
+	 *         with respect to its vertex ids.
+	 */
+	@Override
+	public boolean validate(Graph<K, VV, EV> graph) throws Exception {
+		DataSet<Tuple1<K>> edgeIds = graph.getEdges()
+				.flatMap(new MapEdgeIds<K, EV>()).distinct();
+		DataSet<K> invalidIds = graph.getVertices().coGroup(edgeIds).where(0)
+				.equalTo(0).with(new GroupInvalidIds<K, VV>()).first(1);
+
+		return invalidIds.map(new KToTupleMap<K>()).count() == 0;
+	}
+
+	private static final class MapEdgeIds<K, EV> implements FlatMapFunction<Edge<K, EV>, Tuple1<K>> {
+		public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
+			out.collect(new Tuple1<K>(edge.f0));
+			out.collect(new Tuple1<K>(edge.f1));
+		}
+	}
+
+	private static final class GroupInvalidIds<K, VV> implements CoGroupFunction<Vertex<K, VV>, Tuple1<K>, K> {
+		public void coGroup(Iterable<Vertex<K, VV>> vertexId,
+				Iterable<Tuple1<K>> edgeId, Collector<K> out) {
+			if (!(vertexId.iterator().hasNext())) {
+				// found an id that doesn't exist in the vertex set
+				out.collect(edgeId.iterator().next().f0);
+			}
+		}
+	}
+
+	private static final class KToTupleMap<K> implements MapFunction<K, Tuple1<K>> {
+		public Tuple1<K> map(K key) throws Exception {
+			return new Tuple1<K>(key);
+		}
+	}
+
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
new file mode 100644
index 0000000..2ad203f
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSACompilerTest.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+import org.apache.flink.optimizer.dataproperties.PartitioningProperty;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SingleInputPlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+public class GSACompilerTest extends CompilerTestBase {
+
+	private static final long serialVersionUID = 1L;
+
+	@Test
+	public void testGSACompiler() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(DEFAULT_PARALLELISM);
+			// compose test program
+			{
+				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<Long, Long, NullValue>(
+						1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>());
+
+				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
+
+				DataSet<Vertex<Long, Long>> result = graph.runGatherSumApplyIteration(
+						new GatherNeighborIds(), new SelectMinId(),
+						new UpdateComponentId(), 100).getVertices();
+				
+				result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+			}
+			
+			Plan p = env.createProgramPlan("GSA Connected Components");
+			OptimizedPlan op = compileNoStats(p);
+			
+			// check the sink
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
+			assertEquals(PartitioningProperty.HASH_PARTITIONED, sink.getGlobalProperties().getPartitioning());
+			
+			// check the iteration
+			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+			assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
+			
+			// check the solution set join and the delta
+			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update function preserves the partitioning
+			
+			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
+			assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
+			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
+			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
+			
+			// check the workset set join
+			SingleInputPlanNode sumReducer = (SingleInputPlanNode) ssJoin.getInput1().getSource();
+			SingleInputPlanNode gatherMapper = (SingleInputPlanNode) sumReducer.getInput().getSource();
+			DualInputPlanNode edgeJoin = (DualInputPlanNode) gatherMapper.getInput().getSource(); 
+			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
+			// input1 is the workset
+			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput1().getShipStrategy());
+			// input2 is the edges
+			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput2().getShipStrategy());
+			assertTrue(edgeJoin.getInput2().getTempMode().isCached());
+
+			assertEquals(new FieldList(0), edgeJoin.getInput2().getShipStrategyKeys());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitVertices	implements MapFunction<Long, Long> {
+
+		public Long map(Long vertexId) {
+			return vertexId;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
+
+		public Long gather(Neighbor<Long, NullValue> neighbor) {
+			return neighbor.getNeighborValue();
+		}
+	};
+
+	@SuppressWarnings("serial")
+	private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
+
+		public Long sum(Long newValue, Long currentValue) {
+			return Math.min(newValue, currentValue);
+		}
+	};
+
+	@SuppressWarnings("serial")
+	private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
+
+		public void apply(Long summedValue, Long origValue) {
+			if (summedValue < origValue) {
+				setResult(summedValue);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
new file mode 100644
index 0000000..ced7508
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/gsa/GSATranslationTest.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.aggregators.LongSumAggregator;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.DeltaIterationResultSet;
+import org.apache.flink.api.java.operators.SingleInputUdfOperator;
+import org.apache.flink.api.java.operators.TwoInputUdfOperator;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+public class GSATranslationTest {
+
+	@Test
+	public void testTranslation() {
+		try {
+			final String ITERATION_NAME = "Test Name";
+			
+			final String AGGREGATOR_NAME = "AggregatorName";
+			
+			final String BC_SET_GATHER_NAME = "gather messages";
+			
+			final String BC_SET_SUM_NAME = "sum updates";
+
+			final String BC_SET_APLLY_NAME = "apply updates";
+
+			final int NUM_ITERATIONS = 13;
+			
+			final int ITERATION_parallelism = 77;
+			
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			
+			DataSet<Long> bcGather = env.fromElements(1L);
+			DataSet<Long> bcSum = env.fromElements(1L);
+			DataSet<Long> bcApply = env.fromElements(1L);
+
+			DataSet<Vertex<Long, Long>> result;
+
+			// ------------ construct the test program ------------------
+			{
+
+				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple3<Long, Long, NullValue>(
+						1L, 2L, NullValue.getInstance())).map(new Tuple3ToEdgeMap<Long, NullValue>());
+
+				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(edges, new InitVertices(), env);
+
+				GSAConfiguration parameters = new GSAConfiguration();
+
+				parameters.registerAggregator(AGGREGATOR_NAME, new LongSumAggregator());
+				parameters.setName(ITERATION_NAME);
+				parameters.setParallelism(ITERATION_parallelism);
+				parameters.addBroadcastSetForGatherFunction(BC_SET_GATHER_NAME, bcGather);
+				parameters.addBroadcastSetForSumFunction(BC_SET_SUM_NAME, bcSum);
+				parameters.addBroadcastSetForApplyFunction(BC_SET_APLLY_NAME, bcApply);
+
+				result = graph.runGatherSumApplyIteration(
+						new GatherNeighborIds(), new SelectMinId(),
+						new UpdateComponentId(), NUM_ITERATIONS, parameters).getVertices();
+				
+				result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+			}
+			
+			
+			// ------------- validate the java program ----------------
+			
+			assertTrue(result instanceof DeltaIterationResultSet);
+			
+			DeltaIterationResultSet<?, ?> resultSet = (DeltaIterationResultSet<?, ?>) result;
+			DeltaIteration<?, ?> iteration = (DeltaIteration<?, ?>) resultSet.getIterationHead();
+			
+			// check the basic iteration properties
+			assertEquals(NUM_ITERATIONS, resultSet.getMaxIterations());
+			assertArrayEquals(new int[] {0}, resultSet.getKeyPositions());
+			assertEquals(ITERATION_parallelism, iteration.getParallelism());
+			assertEquals(ITERATION_NAME, iteration.getName());
+			
+			assertEquals(AGGREGATOR_NAME, iteration.getAggregators().getAllRegisteredAggregators().iterator().next().getName());
+			
+			// validate that the semantic properties are set as they should
+			TwoInputUdfOperator<?, ?, ?, ?> solutionSetJoin = (TwoInputUdfOperator<?, ?, ?, ?>) resultSet.getNextWorkset();
+			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(0, 0).contains(0));
+			assertTrue(solutionSetJoin.getSemanticProperties().getForwardingTargetFields(1, 0).contains(0));
+
+			SingleInputUdfOperator<?, ?, ?> sumReduce = (SingleInputUdfOperator<?, ?, ?>) solutionSetJoin.getInput1();
+			SingleInputUdfOperator<?, ?, ?> gatherMap = (SingleInputUdfOperator<?, ?, ?>) sumReduce.getInput();
+
+			// validate that the broadcast sets are forwarded
+			assertEquals(bcGather, gatherMap.getBroadcastSets().get(BC_SET_GATHER_NAME));
+			assertEquals(bcSum, sumReduce.getBroadcastSets().get(BC_SET_SUM_NAME));
+			assertEquals(bcApply, solutionSetJoin.getBroadcastSets().get(BC_SET_APLLY_NAME));
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitVertices	implements MapFunction<Long, Long> {
+
+		public Long map(Long vertexId) {
+			return vertexId;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
+
+		public Long gather(Neighbor<Long, NullValue> neighbor) {
+			return neighbor.getNeighborValue();
+		}
+	};
+
+	@SuppressWarnings("serial")
+	private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
+
+		public Long sum(Long newValue, Long currentValue) {
+			return Math.min(newValue, currentValue);
+		}
+	};
+
+	@SuppressWarnings("serial")
+	private static final class UpdateComponentId extends ApplyFunction<Long, Long, Long> {
+
+		public void apply(Long summedValue, Long origValue) {
+			if (summedValue < origValue) {
+				setResult(summedValue);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
new file mode 100644
index 0000000..7a8143a
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/spargel/SpargelCompilerTest.java
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.graph.spargel;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import org.apache.flink.api.common.Plan;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.util.FieldList;
+import org.apache.flink.api.java.io.DiscardingOutputFormat;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.optimizer.util.CompilerTestBase;
+import org.junit.Test;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.optimizer.plan.DualInputPlanNode;
+import org.apache.flink.optimizer.plan.OptimizedPlan;
+import org.apache.flink.optimizer.plan.PlanNode;
+import org.apache.flink.optimizer.plan.SinkPlanNode;
+import org.apache.flink.optimizer.plan.WorksetIterationPlanNode;
+import org.apache.flink.runtime.operators.shipping.ShipStrategyType;
+import org.apache.flink.runtime.operators.util.LocalStrategy;
+import org.apache.flink.types.NullValue;
+import org.apache.flink.graph.library.ConnectedComponents;
+import org.apache.flink.graph.utils.Tuple2ToVertexMap;
+
+
+public class SpargelCompilerTest extends CompilerTestBase {
+
+	private static final long serialVersionUID = 1L;
+
+	@SuppressWarnings("serial")
+	@Test
+	public void testSpargelCompiler() {
+		try {
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(DEFAULT_PARALLELISM);
+			// compose test program
+			{
+
+				DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
+						new Tuple2<Long, Long>(1L, 1L), new Tuple2<Long, Long>(2L, 2L))
+						.map(new Tuple2ToVertexMap<Long, Long>());
+
+				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L))
+					.map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
+
+						public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
+							return new Edge<Long, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+						}
+				});
+
+				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+				
+				DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
+						new ConnectedComponents.CCUpdater<Long>(),
+						new ConnectedComponents.CCMessenger<Long>(), 100)
+						.getVertices();
+				
+				result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+			}
+			
+			Plan p = env.createProgramPlan("Spargel Connected Components");
+			OptimizedPlan op = compileNoStats(p);
+			
+			// check the sink
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
+			
+			// check the iteration
+			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+			assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
+			
+			// check the solution set join and the delta
+			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
+			
+			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
+			assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
+			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
+			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
+			
+			// check the workset set join
+			DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
+			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
+			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
+			assertTrue(edgeJoin.getInput1().getTempMode().isCached());
+			
+			assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
+			
+			// check that the initial partitioning is pushed out of the loop
+			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
+			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+			assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
+			
+			// check that the initial workset sort is outside the loop
+			assertEquals(LocalStrategy.SORT, iteration.getInput2().getLocalStrategy());
+			assertEquals(new FieldList(0), iteration.getInput2().getLocalStrategyKeys());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+	
+	@SuppressWarnings("serial")
+	@Test
+	public void testSpargelCompilerWithBroadcastVariable() {
+		try {
+			final String BC_VAR_NAME = "borat variable";
+			
+			
+			ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+			env.setParallelism(DEFAULT_PARALLELISM);
+			// compose test program
+			{
+				DataSet<Long> bcVar = env.fromElements(1L);
+
+				DataSet<Vertex<Long, Long>> initialVertices = env.fromElements(
+						new Tuple2<Long, Long>(1L, 1L), new Tuple2<Long, Long>(2L, 2L))
+						.map(new Tuple2ToVertexMap<Long, Long>());
+
+				DataSet<Edge<Long, NullValue>> edges = env.fromElements(new Tuple2<Long, Long>(1L, 2L))
+						.map(new MapFunction<Tuple2<Long,Long>, Edge<Long, NullValue>>() {
+
+							public Edge<Long, NullValue> map(Tuple2<Long, Long> edge) {
+								return new Edge<Long, NullValue>(edge.f0, edge.f1, NullValue.getInstance());
+							}
+					});
+
+				Graph<Long, Long, NullValue> graph = Graph.fromDataSet(initialVertices, edges, env);
+
+				VertexCentricConfiguration parameters = new VertexCentricConfiguration();
+				parameters.addBroadcastSetForMessagingFunction(BC_VAR_NAME, bcVar);
+				parameters.addBroadcastSetForUpdateFunction(BC_VAR_NAME, bcVar);
+
+				DataSet<Vertex<Long, Long>> result = graph.runVertexCentricIteration(
+						new ConnectedComponents.CCUpdater<Long>(),
+						new ConnectedComponents.CCMessenger<Long>(), 100)
+						.getVertices();
+					
+				result.output(new DiscardingOutputFormat<Vertex<Long, Long>>());
+
+			}
+			
+			Plan p = env.createProgramPlan("Spargel Connected Components");
+			OptimizedPlan op = compileNoStats(p);
+			
+			// check the sink
+			SinkPlanNode sink = op.getDataSinks().iterator().next();
+			assertEquals(ShipStrategyType.FORWARD, sink.getInput().getShipStrategy());
+			assertEquals(DEFAULT_PARALLELISM, sink.getParallelism());
+			
+			// check the iteration
+			WorksetIterationPlanNode iteration = (WorksetIterationPlanNode) sink.getInput().getSource();
+			assertEquals(DEFAULT_PARALLELISM, iteration.getParallelism());
+			
+			// check the solution set join and the delta
+			PlanNode ssDelta = iteration.getSolutionSetDeltaPlanNode();
+			assertTrue(ssDelta instanceof DualInputPlanNode); // this is only true if the update functions preserves the partitioning
+			
+			DualInputPlanNode ssJoin = (DualInputPlanNode) ssDelta;
+			assertEquals(DEFAULT_PARALLELISM, ssJoin.getParallelism());
+			assertEquals(ShipStrategyType.PARTITION_HASH, ssJoin.getInput1().getShipStrategy());
+			assertEquals(new FieldList(0), ssJoin.getInput1().getShipStrategyKeys());
+			
+			// check the workset set join
+			DualInputPlanNode edgeJoin = (DualInputPlanNode) ssJoin.getInput1().getSource();
+			assertEquals(DEFAULT_PARALLELISM, edgeJoin.getParallelism());
+			assertEquals(ShipStrategyType.PARTITION_HASH, edgeJoin.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.FORWARD, edgeJoin.getInput2().getShipStrategy());
+			assertTrue(edgeJoin.getInput1().getTempMode().isCached());
+			
+			assertEquals(new FieldList(0), edgeJoin.getInput1().getShipStrategyKeys());
+			
+			// check that the initial partitioning is pushed out of the loop
+			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput1().getShipStrategy());
+			assertEquals(ShipStrategyType.PARTITION_HASH, iteration.getInput2().getShipStrategy());
+			assertEquals(new FieldList(0), iteration.getInput1().getShipStrategyKeys());
+			assertEquals(new FieldList(0), iteration.getInput2().getShipStrategyKeys());
+		}
+		catch (Exception e) {
+			System.err.println(e.getMessage());
+			e.printStackTrace();
+			fail(e.getMessage());
+		}
+	}
+}