You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:56 UTC

[18/24] flink git commit: [FLINK-2833] [gelly] create a flink-libraries module and move gelly there

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
new file mode 100755
index 0000000..23ccb68
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -0,0 +1,425 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.RichFlatJoinFunction;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.common.functions.RichReduceFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.operators.CustomUnaryOperation;
+import org.apache.flink.api.java.operators.DeltaIteration;
+import org.apache.flink.api.java.operators.JoinOperator;
+import org.apache.flink.api.java.operators.MapOperator;
+import org.apache.flink.api.java.operators.ReduceOperator;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
+import org.apache.flink.api.java.typeutils.TupleTypeInfo;
+import org.apache.flink.api.java.typeutils.TypeExtractor;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.util.Collector;
+import java.util.Map;
+
+import com.google.common.base.Preconditions;
+
+/**
+ * This class represents iterative graph computations, programmed in a gather-sum-apply perspective.
+ *
+ * @param <K> The type of the vertex key in the graph
+ * @param <VV> The type of the vertex value in the graph
+ * @param <EV> The type of the edge value in the graph
+ * @param <M> The intermediate type used by the gather, sum and apply functions
+ */
+public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperation<Vertex<K, VV>,
+		Vertex<K, VV>> {
+
+	private DataSet<Vertex<K, VV>> vertexDataSet;
+	private DataSet<Edge<K, EV>> edgeDataSet;
+
+	private final GatherFunction<VV, EV, M> gather;
+	private final SumFunction<VV, EV, M> sum;
+	private final ApplyFunction<K, VV, M> apply;
+	private final int maximumNumberOfIterations;
+	private EdgeDirection direction = EdgeDirection.OUT;
+
+	private GSAConfiguration configuration;
+
+	// ----------------------------------------------------------------------------------
+
+	private GatherSumApplyIteration(GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum,
+			ApplyFunction<K, VV, M> apply, DataSet<Edge<K, EV>> edges, int maximumNumberOfIterations) {
+
+		Preconditions.checkNotNull(gather);
+		Preconditions.checkNotNull(sum);
+		Preconditions.checkNotNull(apply);
+		Preconditions.checkNotNull(edges);
+		Preconditions.checkArgument(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
+
+		this.gather = gather;
+		this.sum = sum;
+		this.apply = apply;
+		this.edgeDataSet = edges;
+		this.maximumNumberOfIterations = maximumNumberOfIterations;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Custom Operator behavior
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * Sets the input data set for this operator. In the case of this operator this input data set represents
+	 * the set of vertices with their initial state.
+	 *
+	 * @param dataSet The input data set, which in the case of this operator represents the set of
+	 *                vertices with their initial state.
+	 */
+	@Override
+	public void setInput(DataSet<Vertex<K, VV>> dataSet) {
+		this.vertexDataSet = dataSet;
+	}
+
+	/**
+	 * Computes the results of the gather-sum-apply iteration
+	 *
+	 * @return The resulting DataSet
+	 */
+	@Override
+	public DataSet<Vertex<K, VV>> createResult() {
+		if (vertexDataSet == null) {
+			throw new IllegalStateException("The input data set has not been set.");
+		}
+
+		// Prepare type information
+		TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertexDataSet.getType()).getTypeAt(0);
+		TypeInformation<M> messageType = TypeExtractor.createTypeInfo(GatherFunction.class, gather.getClass(), 2, null, null);
+		TypeInformation<Tuple2<K, M>> innerType = new TupleTypeInfo<Tuple2<K, M>>(keyType, messageType);
+		TypeInformation<Vertex<K, VV>> outputType = vertexDataSet.getType();
+
+		// create a graph
+		Graph<K, VV, EV> graph =
+				Graph.fromDataSet(vertexDataSet, edgeDataSet, vertexDataSet.getExecutionEnvironment());
+
+		// check whether the numVertices option is set and, if so, compute the total number of vertices
+		// and set it within the gather, sum and apply functions
+		if (this.configuration != null && this.configuration.isOptNumVertices()) {
+			try {
+				long numberOfVertices = graph.numberOfVertices();
+				gather.setNumberOfVertices(numberOfVertices);
+				sum.setNumberOfVertices(numberOfVertices);
+				apply.setNumberOfVertices(numberOfVertices);
+			} catch (Exception e) {
+				e.printStackTrace();
+			}
+		}
+
+		// Prepare UDFs
+		GatherUdf<K, VV, EV, M> gatherUdf = new GatherUdf<K, VV, EV, M>(gather, innerType);
+		SumUdf<K, VV, EV, M> sumUdf = new SumUdf<K, VV, EV, M>(sum, innerType);
+		ApplyUdf<K, VV, EV, M> applyUdf = new ApplyUdf<K, VV, EV, M>(apply, outputType);
+
+		final int[] zeroKeyPos = new int[] {0};
+		final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
+				vertexDataSet.iterateDelta(vertexDataSet, maximumNumberOfIterations, zeroKeyPos);
+
+		// set up the iteration operator
+		if (this.configuration != null) {
+
+			iteration.name(this.configuration.getName(
+					"Gather-sum-apply iteration (" + gather + " | " + sum + " | " + apply + ")"));
+			iteration.parallelism(this.configuration.getParallelism());
+			iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
+
+			// register all aggregators
+			for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
+				iteration.registerAggregator(entry.getKey(), entry.getValue());
+			}
+		}
+		else {
+			// no configuration provided; set default name
+			iteration.name("Gather-sum-apply iteration (" + gather + " | " + sum + " | " + apply + ")");
+		}
+
+		// Prepare the neighbors
+		if(this.configuration != null) {
+			direction = this.configuration.getDirection();
+		}
+		DataSet<Tuple2<K, Neighbor<VV, EV>>> neighbors;
+		switch(direction) {
+			case OUT:
+				neighbors = iteration
+				.getWorkset().join(edgeDataSet)
+				.where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>());
+				break;
+			case IN:
+				neighbors = iteration
+				.getWorkset().join(edgeDataSet)
+				.where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>());
+				break;
+			case ALL:
+				neighbors =  iteration
+						.getWorkset().join(edgeDataSet)
+						.where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>()).union(iteration
+								.getWorkset().join(edgeDataSet)
+								.where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>()));
+				break;
+			default:
+				neighbors = iteration
+						.getWorkset().join(edgeDataSet)
+						.where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>());
+				break;
+		}
+
+		// Gather, sum and apply
+		MapOperator<Tuple2<K, Neighbor<VV, EV>>, Tuple2<K, M>> gatherMapOperator = neighbors.map(gatherUdf);
+
+		// configure map gather function with name and broadcast variables
+		gatherMapOperator = gatherMapOperator.name("Gather");
+
+		if (this.configuration != null) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getGatherBcastVars()) {
+				gatherMapOperator = gatherMapOperator.withBroadcastSet(e.f1, e.f0);
+			}
+		}
+		DataSet<Tuple2<K, M>> gatheredSet = gatherMapOperator;
+
+		ReduceOperator<Tuple2<K, M>> sumReduceOperator = gatheredSet.groupBy(0).reduce(sumUdf);
+
+		// configure reduce sum function with name and broadcast variables
+		sumReduceOperator = sumReduceOperator.name("Sum");
+
+		if (this.configuration != null) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getSumBcastVars()) {
+				sumReduceOperator = sumReduceOperator.withBroadcastSet(e.f1, e.f0);
+			}
+		}
+		DataSet<Tuple2<K, M>> summedSet = sumReduceOperator;
+
+		JoinOperator<?, ?, Vertex<K, VV>> appliedSet = summedSet
+				.join(iteration.getSolutionSet())
+				.where(0)
+				.equalTo(0)
+				.with(applyUdf);
+
+		// configure join apply function with name and broadcast variables
+		appliedSet = appliedSet.name("Apply");
+
+		if (this.configuration != null) {
+			for (Tuple2<String, DataSet<?>> e : this.configuration.getApplyBcastVars()) {
+				appliedSet = appliedSet.withBroadcastSet(e.f1, e.f0);
+			}
+		}
+
+		// let the operator know that we preserve the key field
+		appliedSet.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
+
+		return iteration.closeWith(appliedSet, appliedSet);
+	}
+
+	/**
+	 * Creates a new gather-sum-apply iteration operator for graphs
+	 *
+	 * @param edges The edge DataSet
+	 *
+	 * @param gather The gather function of the GSA iteration
+	 * @param sum The sum function of the GSA iteration
+	 * @param apply The apply function of the GSA iteration
+	 *
+	 * @param maximumNumberOfIterations The maximum number of iterations executed
+	 *
+	 * @param <K> The type of the vertex key in the graph
+	 * @param <VV> The type of the vertex value in the graph
+	 * @param <EV> The type of the edge value in the graph
+	 * @param <M> The intermediate type used by the gather, sum and apply functions
+	 *
+	 * @return An in stance of the gather-sum-apply graph computation operator.
+	 */
+	public static final <K, VV, EV, M> GatherSumApplyIteration<K, VV, EV, M>
+		withEdges(DataSet<Edge<K, EV>> edges, GatherFunction<VV, EV, M> gather,
+		SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply, int maximumNumberOfIterations) {
+
+		return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Wrapping UDFs
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	@ForwardedFields("f0")
+	private static final class GatherUdf<K, VV, EV, M> extends RichMapFunction<Tuple2<K, Neighbor<VV, EV>>,
+			Tuple2<K, M>> implements ResultTypeQueryable<Tuple2<K, M>> {
+
+		private final GatherFunction<VV, EV, M> gatherFunction;
+		private transient TypeInformation<Tuple2<K, M>> resultType;
+
+		private GatherUdf(GatherFunction<VV, EV, M> gatherFunction, TypeInformation<Tuple2<K, M>> resultType) {
+			this.gatherFunction = gatherFunction;
+			this.resultType = resultType;
+		}
+
+		@Override
+		public Tuple2<K, M> map(Tuple2<K, Neighbor<VV, EV>> neighborTuple) {
+			M result = this.gatherFunction.gather(neighborTuple.f1);
+			return new Tuple2<K, M>(neighborTuple.f0, result);
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+				this.gatherFunction.init(getIterationRuntimeContext());
+			}
+			this.gatherFunction.preSuperstep();
+		}
+
+		@Override
+		public void close() throws Exception {
+			this.gatherFunction.postSuperstep();
+		}
+
+		@Override
+		public TypeInformation<Tuple2<K, M>> getProducedType() {
+			return this.resultType;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumUdf<K, VV, EV, M> extends RichReduceFunction<Tuple2<K, M>>
+			implements ResultTypeQueryable<Tuple2<K, M>>{
+
+		private final SumFunction<VV, EV, M> sumFunction;
+		private transient TypeInformation<Tuple2<K, M>> resultType;
+
+		private SumUdf(SumFunction<VV, EV, M> sumFunction, TypeInformation<Tuple2<K, M>> resultType) {
+			this.sumFunction = sumFunction;
+			this.resultType = resultType;
+		}
+
+		@Override
+		public Tuple2<K, M> reduce(Tuple2<K, M> arg0, Tuple2<K, M> arg1) throws Exception {
+			K key = arg0.f0;
+			M result = this.sumFunction.sum(arg0.f1, arg1.f1);
+			return new Tuple2<K, M>(key, result);
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+				this.sumFunction.init(getIterationRuntimeContext());
+			}
+			this.sumFunction.preSuperstep();
+		}
+
+		@Override
+		public void close() throws Exception {
+			this.sumFunction.postSuperstep();
+		}
+
+		@Override
+		public TypeInformation<Tuple2<K, M>> getProducedType() {
+			return this.resultType;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class ApplyUdf<K, VV, EV, M> extends RichFlatJoinFunction<Tuple2<K, M>,
+			Vertex<K, VV>, Vertex<K, VV>> implements ResultTypeQueryable<Vertex<K, VV>> {
+
+		private final ApplyFunction<K, VV, M> applyFunction;
+		private transient TypeInformation<Vertex<K, VV>> resultType;
+
+		private ApplyUdf(ApplyFunction<K, VV, M> applyFunction, TypeInformation<Vertex<K, VV>> resultType) {
+			this.applyFunction = applyFunction;
+			this.resultType = resultType;
+		}
+
+		@Override
+		public void join(Tuple2<K, M> newValue, final Vertex<K, VV> currentValue, final Collector<Vertex<K, VV>> out) throws Exception {
+
+			this.applyFunction.setOutput(currentValue, out);
+			this.applyFunction.apply(newValue.f1, currentValue.getValue());
+		}
+
+		@Override
+		public void open(Configuration parameters) throws Exception {
+			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
+				this.applyFunction.init(getIterationRuntimeContext());
+			}
+			this.applyFunction.preSuperstep();
+		}
+
+		@Override
+		public void close() throws Exception {
+			this.applyFunction.postSuperstep();
+		}
+
+		@Override
+		public TypeInformation<Vertex<K, VV>> getProducedType() {
+			return this.resultType;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	@ForwardedFieldsSecond("f1->f0")
+	private static final class ProjectKeyWithNeighborOUT<K, VV, EV> implements FlatJoinFunction<
+			Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
+
+		public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
+			out.collect(new Tuple2<K, Neighbor<VV, EV>>(
+					edge.getTarget(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
+		}
+	}
+
+	@SuppressWarnings("serial")
+	@ForwardedFieldsSecond({"f0"})
+	private static final class ProjectKeyWithNeighborIN<K, VV, EV> implements FlatJoinFunction<
+			Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
+
+		public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
+			out.collect(new Tuple2<K, Neighbor<VV, EV>>(
+					edge.getSource(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
+		}
+	}
+
+
+
+
+	/**
+	 * Configures this gather-sum-apply iteration with the provided parameters.
+	 *
+	 * @param parameters the configuration parameters
+	 */
+	public void configure(GSAConfiguration parameters) {
+		this.configuration = parameters;
+	}
+
+	/**
+	 * @return the configuration parameters of this gather-sum-apply iteration
+	 */
+	public GSAConfiguration getIterationConfiguration() {
+		return this.configuration;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
new file mode 100755
index 0000000..7fa1ed2
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ * This class represents a <sourceVertex, edge> pair
+ * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ */
+@SuppressWarnings("serial")
+public class Neighbor<VV, EV> extends Tuple2<VV, EV> {
+
+	public Neighbor() {}
+
+	public Neighbor(VV neighborValue, EV edgeValue) {
+		super(neighborValue, edgeValue);
+	}
+
+	public VV getNeighborValue() {
+		return this.f0;
+	}
+
+	public EV getEdgeValue() {
+		return this.f1;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
new file mode 100755
index 0000000..f27e275
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.gsa;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.types.Value;
+
+import java.io.Serializable;
+import java.util.Collection;
+
+/**
+ * The base class for the second step of a {@link GatherSumApplyIteration}.
+ *
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ * @param <M> the output type
+ */
+@SuppressWarnings("serial")
+public abstract class SumFunction<VV, EV, M> implements Serializable {
+
+	// --------------------------------------------------------------------------------------------
+	//  Attribute that allows access to the total number of vertices inside an iteration.
+	// --------------------------------------------------------------------------------------------
+
+	private long numberOfVertices = -1L;
+
+	/**
+	 * Retrieves the number of vertices in the graph.
+	 * @return the number of vertices if the {@link IterationConfigurationion#setOptNumVertices(boolean)}
+	 * option has been set; -1 otherwise.
+	 */
+	public long getNumberOfVertices() {
+		return numberOfVertices;
+	}
+
+	void setNumberOfVertices(long numberOfVertices) {
+		this.numberOfVertices = numberOfVertices;
+	}
+
+	//---------------------------------------------------------------------------------------------
+	/**
+	 * This method is invoked once per superstep, after the {@link GatherFunction} 
+	 * in a {@link GatherSumApplyIteration}.
+	 * It combines the partial values produced by {@link GatherFunction#gather(Neighbor)}
+	 * in pairs, until a single value has been computed.
+	 * 
+	 * @param arg0 the first partial value.
+	 * @param arg1 the second partial value.
+	 * @return the combined value.
+	 */
+	public abstract M sum(M arg0, M arg1);
+
+	/**
+	 * This method is executed once per superstep before the vertex update function is invoked for each vertex.
+	 *
+	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+	 */
+	public void preSuperstep() {}
+
+	/**
+	 * This method is executed once per superstep after the vertex update function has been invoked for each vertex.
+	 *
+	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+	 */
+	public void postSuperstep() {}
+
+	/**
+	 * Gets the number of the superstep, starting at <tt>1</tt>.
+	 *
+	 * @return The number of the current superstep.
+	 */
+	public int getSuperstepNumber() {
+		return this.runtimeContext.getSuperstepNumber();
+	}
+
+	/**
+	 * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
+	 * all aggregates globally once per superstep and makes them available in the next superstep.
+	 *
+	 * @param name The name of the aggregator.
+	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
+	 */
+	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+		return this.runtimeContext.<T>getIterationAggregator(name);
+	}
+
+	/**
+	 * Get the aggregated value that an aggregator computed in the previous iteration.
+	 *
+	 * @param name The name of the aggregator.
+	 * @return The aggregated value of the previous iteration.
+	 */
+	public <T extends Value> T getPreviousIterationAggregate(String name) {
+		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+	}
+
+	/**
+	 * Gets the broadcast data set registered under the given name. Broadcast data sets
+	 * are available on all parallel instances of a function.
+	 *
+	 * @param name The name under which the broadcast set is registered.
+	 * @return The broadcast data set.
+	 */
+	public <T> Collection<T> getBroadcastSet(String name) {
+		return this.runtimeContext.<T>getBroadcastVariable(name);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Internal methods
+	// --------------------------------------------------------------------------------------------
+
+	private IterationRuntimeContext runtimeContext;
+
+	public void init(IterationRuntimeContext iterationRuntimeContext) {
+		this.runtimeContext = iterationRuntimeContext;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
new file mode 100644
index 0000000..0dd39fc
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+import java.util.Map;
+import java.util.TreeMap;
+
+/**
+ * Community Detection Algorithm.
+ *
+ * This implementation expects Long Vertex values and labels. The Vertex values of the input Graph provide the initial label assignments.
+ * 
+ * Initially, each vertex is assigned a tuple formed of its own initial value along with a score equal to 1.0.
+ * The vertices propagate their labels and max scores in iterations, each time adopting the label with the
+ * highest score from the list of received messages. The chosen label is afterwards re-scored using the fraction
+ * delta/the superstep number. Delta is passed as a parameter and has 0.5 as a default value.
+ *
+ * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
+ * is reached.
+ * 
+ * @param <K> the Vertex ID type 
+ *
+ * @see <a href="http://arxiv.org/pdf/0808.2633.pdf">article explaining the algorithm in detail</a>
+ */
+public class CommunityDetection<K> implements GraphAlgorithm<K, Long, Double, Graph<K, Long, Double>> {
+
+	private Integer maxIterations;
+
+	private Double delta;
+
+	public CommunityDetection(Integer maxIterations, Double delta) {
+
+		this.maxIterations = maxIterations;
+		this.delta = delta;
+	}
+
+	@Override
+	public Graph<K, Long, Double> run(Graph<K, Long, Double> graph) {
+
+		DataSet<Vertex<K, Tuple2<Long, Double>>> initializedVertices = graph.getVertices()
+				.map(new AddScoreToVertexValuesMapper<K>());
+
+		Graph<K, Tuple2<Long, Double>, Double> graphWithScoredVertices =
+				Graph.fromDataSet(initializedVertices, graph.getEdges(), graph.getContext()).getUndirected();
+
+		return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater<K>(delta),
+				new LabelMessenger<K>(), maxIterations)
+				.mapVertices(new RemoveScoreFromVertexValuesMapper<K>());
+	}
+
+	@SuppressWarnings("serial")
+	public static final class VertexLabelUpdater<K> extends VertexUpdateFunction<
+		K, Tuple2<Long, Double>, Tuple2<Long, Double>> {
+
+		private Double delta;
+
+		public VertexLabelUpdater(Double delta) {
+			this.delta = delta;
+		}
+
+		@Override
+		public void updateVertex(Vertex<K, Tuple2<Long, Double>> vertex,
+								MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception {
+
+			// we would like these two maps to be ordered
+			Map<Long, Double> receivedLabelsWithScores = new TreeMap<Long, Double>();
+			Map<Long, Double> labelsWithHighestScore = new TreeMap<Long, Double>();
+
+			for (Tuple2<Long, Double> message : inMessages) {
+				// split the message into received label and score
+				Long receivedLabel = message.f0;
+				Double receivedScore = message.f1;
+
+				// if the label was received before
+				if (receivedLabelsWithScores.containsKey(receivedLabel)) {
+					Double newScore = receivedScore + receivedLabelsWithScores.get(receivedLabel);
+					receivedLabelsWithScores.put(receivedLabel, newScore);
+				} else {
+					// first time we see the label
+					receivedLabelsWithScores.put(receivedLabel, receivedScore);
+				}
+
+				// store the labels with the highest scores
+				if (labelsWithHighestScore.containsKey(receivedLabel)) {
+					Double currentScore = labelsWithHighestScore.get(receivedLabel);
+					if (currentScore < receivedScore) {
+						// record the highest score
+						labelsWithHighestScore.put(receivedLabel, receivedScore);
+					}
+				} else {
+					// first time we see this label
+					labelsWithHighestScore.put(receivedLabel, receivedScore);
+				}
+			}
+
+			if(receivedLabelsWithScores.size() > 0) {
+				// find the label with the highest score from the ones received
+				Double maxScore = -Double.MAX_VALUE;
+				Long maxScoreLabel = vertex.getValue().f0;
+				for (Long curLabel : receivedLabelsWithScores.keySet()) {
+
+					if (receivedLabelsWithScores.get(curLabel) > maxScore) {
+						maxScore = receivedLabelsWithScores.get(curLabel);
+						maxScoreLabel = curLabel;
+					}
+				}
+
+				// find the highest score of maxScoreLabel
+				Double highestScore = labelsWithHighestScore.get(maxScoreLabel);
+				// re-score the new label
+				if (maxScoreLabel != vertex.getValue().f0) {
+					highestScore -= delta / getSuperstepNumber();
+				}
+				// else delta = 0
+				// update own label
+				setNewVertexValue(new Tuple2<Long, Double>(maxScoreLabel, highestScore));
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class LabelMessenger<K> extends MessagingFunction<K, Tuple2<Long, Double>,
+			Tuple2<Long, Double>, Double> {
+
+		@Override
+		public void sendMessages(Vertex<K, Tuple2<Long, Double>> vertex) throws Exception {
+
+			for(Edge<K, Double> edge : getEdges()) {
+				sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertex.getValue().f0,
+						vertex.getValue().f1 * edge.getValue()));
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	@ForwardedFields("f0")
+	public static final class AddScoreToVertexValuesMapper<K> implements MapFunction<
+		Vertex<K, Long>, Vertex<K, Tuple2<Long, Double>>> {
+
+		public Vertex<K, Tuple2<Long, Double>> map(Vertex<K, Long> vertex) {
+			return new Vertex<K, Tuple2<Long, Double>>(
+					vertex.getId(), new Tuple2<Long, Double>(vertex.getValue(), 1.0));
+		}
+	}
+
+	@SuppressWarnings("serial")
+	public static final class RemoveScoreFromVertexValuesMapper<K> implements MapFunction<
+		Vertex<K, Tuple2<Long, Double>>, Long> {
+
+		@Override
+		public Long map(Vertex<K, Tuple2<Long, Double>> vertex) throws Exception {
+			return vertex.getValue().f0;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
new file mode 100644
index 0000000..ed853fe
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.types.NullValue;
+
+/**
+ * A vertex-centric implementation of the Connected Components algorithm.
+ *
+ * This implementation assumes that the vertices of the input Graph are initialized with unique, Long component IDs.
+ * The vertices propagate their current component ID in iterations, each time adopting a new value from the received neighbor IDs,
+ * provided that the value is less than the current minimum.
+ *
+ * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
+ * is reached.
+ * 
+ * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID.
+ * 
+ * @see {@link org.apache.flink.graph.library.GSAConnectedComponents}
+ */
+@SuppressWarnings("serial")
+public class ConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> {
+
+	private Integer maxIterations;
+
+	public ConnectedComponents(Integer maxIterations) {
+		this.maxIterations = maxIterations;
+	}
+
+	@Override
+	public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> graph) throws Exception {
+
+		Graph<K, Long, NullValue> undirectedGraph = graph.mapEdges(new NullValueEdgeMapper<K, EV>())
+				.getUndirected();
+
+		// initialize vertex values and run the Vertex Centric Iteration
+		return undirectedGraph.runVertexCentricIteration(
+				new CCUpdater<K>(), new CCMessenger<K>(), maxIterations)
+				.getVertices();
+	}
+
+	/**
+	 * Updates the value of a vertex by picking the minimum neighbor ID out of all the incoming messages.
+	 */
+	public static final class CCUpdater<K> extends VertexUpdateFunction<K, Long, Long> {
+
+		@Override
+		public void updateVertex(Vertex<K, Long> vertex, MessageIterator<Long> messages) throws Exception {
+			long min = Long.MAX_VALUE;
+
+			for (long msg : messages) {
+				min = Math.min(min, msg);
+			}
+
+			// update vertex value, if new minimum
+			if (min < vertex.getValue()) {
+				setNewVertexValue(min);
+			}
+		}
+	}
+
+	/**
+	 * Distributes the minimum ID associated with a given vertex among all the target vertices.
+	 */
+	public static final class CCMessenger<K> extends MessagingFunction<K, Long, Long, NullValue> {
+
+		@Override
+		public void sendMessages(Vertex<K, Long> vertex) throws Exception {
+			// send current minimum to neighbors
+			sendMessageToAllNeighbors(vertex.getValue());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
new file mode 100755
index 0000000..77bc2cf
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.types.NullValue;
+
+/**
+ * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration.
+ * This implementation assumes that the vertices of the input Graph are initialized with unique, Long component IDs.
+ * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID.
+ * 
+ * @see {@link org.apache.flink.graph.library.ConnectedComponents}
+ */
+public class GSAConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> {
+
+	private Integer maxIterations;
+
+	public GSAConnectedComponents(Integer maxIterations) {
+		this.maxIterations = maxIterations;
+	}
+
+	@Override
+	public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> graph) throws Exception {
+
+		Graph<K, Long, NullValue> undirectedGraph = graph.mapEdges(new NullValueEdgeMapper<K, EV>())
+				.getUndirected();
+
+		// initialize vertex values and run the Vertex Centric Iteration
+		return undirectedGraph.runGatherSumApplyIteration(
+				new GatherNeighborIds(), new SelectMinId(), new UpdateComponentId<K>(),
+				maxIterations).getVertices();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Connected Components UDFs
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
+
+		public Long gather(Neighbor<Long, NullValue> neighbor) {
+			return neighbor.getNeighborValue();
+		}
+	};
+
+	@SuppressWarnings("serial")
+	private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
+
+		public Long sum(Long newValue, Long currentValue) {
+			return Math.min(newValue, currentValue);
+		}
+	};
+
+	@SuppressWarnings("serial")
+	private static final class UpdateComponentId<K> extends ApplyFunction<K, Long, Long> {
+
+		public void apply(Long summedValue, Long origValue) {
+			if (summedValue < origValue) {
+				setResult(summedValue);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
new file mode 100644
index 0000000..df3e89a
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+import org.apache.flink.graph.gsa.SumFunction;
+
+/**
+ * This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration.
+ * The user can define the damping factor and the maximum number of iterations.
+ * If the number of vertices of the input graph is known, it should be provided as a parameter
+ * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
+ * 
+ * The implementation assumes that each page has at least one incoming and one outgoing link.
+ */
+public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
+
+	private double beta;
+	private int maxIterations;
+	private long numberOfVertices;
+
+	/**
+	 * @param beta the damping factor
+	 * @param maxIterations the maximum number of iterations
+	 */
+	public GSAPageRank(double beta, int maxIterations) {
+		this.beta = beta;
+		this.maxIterations = maxIterations;
+	}
+
+	public GSAPageRank(double beta, long numVertices, int maxIterations) {
+		this.beta = beta;
+		this.numberOfVertices = numVertices;
+		this.maxIterations = maxIterations;
+	}
+
+	@Override
+	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
+
+		if (numberOfVertices == 0) {
+			numberOfVertices = network.numberOfVertices();
+		}
+
+		DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
+
+		Graph<K, Double, Double> networkWithWeights = network
+				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+
+		return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
+				new UpdateRanks<K>(beta, numberOfVertices), maxIterations)
+				.getVertices();
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Page Rank UDFs
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class GatherRanks extends GatherFunction<Double, Double, Double> {
+
+		long numberOfVertices;
+
+		public GatherRanks(long numberOfVertices) {
+			this.numberOfVertices = numberOfVertices;
+		}
+
+		@Override
+		public Double gather(Neighbor<Double, Double> neighbor) {
+			double neighborRank = neighbor.getNeighborValue();
+
+			if(getSuperstepNumber() == 1) {
+				neighborRank = 1.0 / numberOfVertices;
+			}
+
+			return neighborRank * neighbor.getEdgeValue();
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumRanks extends SumFunction<Double, Double, Double> {
+
+		@Override
+		public Double sum(Double newValue, Double currentValue) {
+			return newValue + currentValue;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class UpdateRanks<K> extends ApplyFunction<K, Double, Double> {
+
+		private final double beta;
+		private final long numVertices;
+
+		public UpdateRanks(double beta, long numberOfVertices) {
+			this.beta = beta;
+			this.numVertices = numberOfVertices;
+		}
+
+		@Override
+		public void apply(Double rankSum, Double currentValue) {
+			setResult((1-beta)/numVertices + beta * rankSum);
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
+		public Double map(Tuple2<Double, Long> value) {
+			return value.f0 / value.f1;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
new file mode 100755
index 0000000..5a76072
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GatherFunction;
+import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.graph.gsa.Neighbor;
+
+/**
+ * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
+ */
+public class GSASingleSourceShortestPaths<K> implements
+	GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
+
+	private final K srcVertexId;
+	private final Integer maxIterations;
+
+	public GSASingleSourceShortestPaths(K srcVertexId, Integer maxIterations) {
+		this.srcVertexId = srcVertexId;
+		this.maxIterations = maxIterations;
+	}
+
+	@Override
+	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
+
+		return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
+				.runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
+						new UpdateDistance<K>(), maxIterations)
+						.getVertices();
+	}
+
+	@SuppressWarnings("serial")
+	public static final class InitVerticesMapper<K>	implements MapFunction<Vertex<K, Double>, Double> {
+
+		private K srcVertexId;
+
+		public InitVerticesMapper(K srcId) {
+			this.srcVertexId = srcId;
+		}
+
+		public Double map(Vertex<K, Double> value) {
+			if (value.f0.equals(srcVertexId)) {
+				return 0.0;
+			} else {
+				return Double.MAX_VALUE;
+			}
+		}
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Single Source Shortest Path UDFs
+	// --------------------------------------------------------------------------------------------
+
+	@SuppressWarnings("serial")
+	private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
+
+		public Double gather(Neighbor<Double, Double> neighbor) {
+			return neighbor.getNeighborValue() + neighbor.getEdgeValue();
+		}
+	};
+
+	@SuppressWarnings("serial")
+	private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
+
+		public Double sum(Double newValue, Double currentValue) {
+			return Math.min(newValue, currentValue);
+		}
+	};
+
+	@SuppressWarnings("serial")
+	private static final class UpdateDistance<K> extends ApplyFunction<K, Double, Double> {
+
+		public void apply(Double newDistance, Double oldDistance) {
+			if (newDistance < oldDistance) {
+				setResult(newDistance);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
new file mode 100644
index 0000000..76d170d
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
@@ -0,0 +1,190 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.ReduceNeighborsFunction;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Triplet;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.types.NullValue;
+
+import java.util.TreeMap;
+
+/**
+ * Triangle Count Algorithm.
+ *
+ * This algorithm operates in three phases. First, vertices select neighbors with id greater than theirs
+ * and send messages to them. Each received message is then propagated to neighbors with higher id.
+ * Finally, if a node encounters the target id in the list of received messages, it increments the number
+ * of triangles found.
+ *
+ * This implementation is non - iterative.
+ *
+ * The algorithm takes an undirected, unweighted graph as input and outputs a DataSet
+ * which contains a single integer representing the number of triangles.
+ */
+public class GSATriangleCount<K extends Comparable<K>, VV, EV> implements
+		GraphAlgorithm<K, VV, EV, DataSet<Integer>> {
+
+	@SuppressWarnings("serial")
+	@Override
+	public DataSet<Integer> run(Graph<K, VV, EV> input) throws Exception {
+
+		ExecutionEnvironment env = input.getContext();
+
+		// order the edges so that src is always higher than trg
+		DataSet<Edge<K, NullValue>> edges = input.getEdges().map(new OrderEdges<K, EV>()).distinct();
+
+		Graph<K, TreeMap<K, Integer>, NullValue> graph = Graph.fromDataSet(edges,
+				new VertexInitializer<K>(), env);
+
+		// select neighbors with ids higher than the current vertex id
+		// Gather: a no-op in this case
+		// Sum: create the set of neighbors
+		DataSet<Tuple2<K, TreeMap<K, Integer>>> higherIdNeighbors =
+				graph.reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
+
+		Graph<K, TreeMap<K, Integer>, NullValue> graphWithReinitializedVertexValues =
+				graph.mapVertices(new VertexInitializerEmptyTreeMap<K>());
+
+		// Apply: attach the computed values to the vertices
+		// joinWithVertices to update the node values
+		DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithHigherIdNeighbors =
+				graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new AttachValues<K>()).getVertices();
+
+		Graph<K, TreeMap<K,Integer>, NullValue> graphWithNeighbors = Graph.fromDataSet(verticesWithHigherIdNeighbors,
+				edges, env);
+
+		// propagate each received value to neighbors with higher id
+		// Gather: a no-op in this case
+		// Sum: propagate values
+		DataSet<Tuple2<K, TreeMap<K, Integer>>> propagatedValues = graphWithNeighbors
+				.reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
+
+		// Apply: attach propagated values to vertices
+		DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithPropagatedValues =
+				graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new AttachValues<K>()).getVertices();
+
+		Graph<K, TreeMap<K, Integer>, NullValue> graphWithPropagatedNeighbors =
+				Graph.fromDataSet(verticesWithPropagatedValues, graphWithNeighbors.getEdges(), env);
+
+		// Scatter: compute the number of triangles
+		DataSet<Integer> numberOfTriangles = graphWithPropagatedNeighbors.getTriplets()
+				.map(new ComputeTriangles<K>()).reduce(new ReduceFunction<Integer>() {
+
+					@Override
+					public Integer reduce(Integer first, Integer second) throws Exception {
+						return first + second;
+					}
+				});
+
+		return numberOfTriangles;
+	}
+
+	@SuppressWarnings("serial")
+	private static final class OrderEdges<K extends Comparable<K>, EV> implements
+		MapFunction<Edge<K, EV>, Edge<K, NullValue>> {
+
+		@Override
+		public Edge<K, NullValue> map(Edge<K, EV> edge) throws Exception {
+			if (edge.getSource().compareTo(edge.getTarget()) < 0) {
+				return new Edge<K, NullValue>(edge.getTarget(), edge.getSource(), NullValue.getInstance());
+			} else {
+				return new Edge<K, NullValue>(edge.getSource(), edge.getTarget(), NullValue.getInstance());
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class VertexInitializer<K> implements MapFunction<K, TreeMap<K, Integer>> {
+
+		@Override
+		public TreeMap<K, Integer> map(K value) throws Exception {
+			TreeMap<K, Integer> neighbors = new TreeMap<K, Integer>();
+			neighbors.put(value, 1);
+
+			return neighbors;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class VertexInitializerEmptyTreeMap<K> implements
+			MapFunction<Vertex<K, TreeMap<K, Integer>>, TreeMap<K, Integer>> {
+
+		@Override
+		public TreeMap<K, Integer> map(Vertex<K, TreeMap<K, Integer>> vertex) throws Exception {
+			return new TreeMap<K, Integer>();
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class AttachValues<K> implements MapFunction<Tuple2<TreeMap<K, Integer>,
+			TreeMap<K, Integer>>, TreeMap<K, Integer>> {
+
+		@Override
+		public TreeMap<K, Integer> map(Tuple2<TreeMap<K, Integer>, TreeMap<K, Integer>> tuple2) throws Exception {
+			return tuple2.f1;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class GatherHigherIdNeighbors<K> implements
+		ReduceNeighborsFunction<TreeMap<K,Integer>> {
+
+		@Override
+		public TreeMap<K, Integer> reduceNeighbors(TreeMap<K,Integer> first, TreeMap<K,Integer> second) {
+			for (K key : second.keySet()) {
+				Integer value = first.get(key);
+				if (value != null) {
+					first.put(key, value + second.get(key));
+				} else {
+					first.put(key, second.get(key));
+				}
+			}
+			return first;
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class ComputeTriangles<K> implements MapFunction<Triplet<K, TreeMap<K, Integer>, NullValue>,
+			Integer> {
+
+		@Override
+		public Integer map(Triplet<K, TreeMap<K, Integer>, NullValue> triplet) throws Exception {
+
+			Vertex<K, TreeMap<K, Integer>> srcVertex = triplet.getSrcVertex();
+			Vertex<K, TreeMap<K, Integer>> trgVertex = triplet.getTrgVertex();
+			int triangles = 0;
+
+			if(trgVertex.getValue().get(srcVertex.getId()) != null) {
+				triangles = trgVertex.getValue().get(srcVertex.getId());
+			}
+			return triangles;
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
new file mode 100644
index 0000000..82dfee7
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.graph.utils.NullValueEdgeMapper;
+import org.apache.flink.types.NullValue;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
+/**
+ * An implementation of the label propagation algorithm. The iterative algorithm
+ * detects communities by propagating labels. In each iteration, a vertex adopts
+ * the label that is most frequent among its neighbors' labels. Labels are
+ * represented by Longs and we assume a total ordering among them, in order to
+ * break ties. The algorithm converges when no vertex changes its value or the
+ * maximum number of iterations have been reached. Note that different
+ * initializations might lead to different results.
+ * 
+ */
+@SuppressWarnings("serial")
+
+public class LabelPropagation<K extends Comparable<K>, EV> implements GraphAlgorithm<K, Long, EV,
+	DataSet<Vertex<K, Long>>> {
+
+	private final int maxIterations;
+
+	public LabelPropagation(int maxIterations) {
+		this.maxIterations = maxIterations;
+	}
+
+	@Override
+	public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> input) {
+
+		// iteratively adopt the most frequent label among the neighbors
+		// of each vertex
+		return input.mapEdges(new NullValueEdgeMapper<K, EV>()).runVertexCentricIteration(new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(),
+				maxIterations).getVertices();
+	}
+
+	/**
+	 * Function that updates the value of a vertex by adopting the most frequent
+	 * label among its in-neighbors
+	 */
+	public static final class UpdateVertexLabel<K> extends VertexUpdateFunction<K, Long, Long> {
+
+		public void updateVertex(Vertex<K, Long> vertex,
+				MessageIterator<Long> inMessages) {
+			Map<Long, Long> labelsWithFrequencies = new HashMap<Long, Long>();
+
+			long maxFrequency = 1;
+			long mostFrequentLabel = vertex.getValue();
+
+			// store the labels with their frequencies
+			for (Long msg : inMessages) {
+				if (labelsWithFrequencies.containsKey(msg)) {
+					long currentFreq = labelsWithFrequencies.get(msg);
+					labelsWithFrequencies.put(msg, currentFreq + 1);
+				} else {
+					labelsWithFrequencies.put(msg, 1L);
+				}
+			}
+			// select the most frequent label: if two or more labels have the
+			// same frequency,
+			// the node adopts the label with the highest value
+			for (Entry<Long, Long> entry : labelsWithFrequencies.entrySet()) {
+				if (entry.getValue() == maxFrequency) {
+					// check the label value to break ties
+					if (entry.getKey() > mostFrequentLabel) {
+						mostFrequentLabel = entry.getKey();
+					}
+				} else if (entry.getValue() > maxFrequency) {
+					maxFrequency = entry.getValue();
+					mostFrequentLabel = entry.getKey();
+				}
+			}
+
+			// set the new vertex value
+			setNewVertexValue(mostFrequentLabel);
+		}
+	}
+
+	/**
+	 * Sends the vertex label to all out-neighbors
+	 */
+	public static final class SendNewLabelToNeighbors<K> extends MessagingFunction<K, Long, Long, NullValue> {
+
+		public void sendMessages(Vertex<K, Long> vertex) {
+			sendMessageToAllNeighbors(vertex.getValue());
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
new file mode 100644
index 0000000..8193dba
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+/**
+ * This is an implementation of a simple PageRank algorithm, using a vertex-centric iteration.
+ * The user can define the damping factor and the maximum number of iterations.
+ * If the number of vertices of the input graph is known, it should be provided as a parameter
+ * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
+ * 
+ * The implementation assumes that each page has at least one incoming and one outgoing link.
+ */
+public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
+
+	private double beta;
+	private int maxIterations;
+	private long numberOfVertices;
+
+	/**
+	 * @param beta the damping factor
+	 * @param maxIterations the maximum number of iterations
+	 */
+	public PageRank(double beta, int maxIterations) {
+		this.beta = beta;
+		this.maxIterations = maxIterations;
+		this.numberOfVertices = 0;
+	}
+
+	/**
+	 * @param beta the damping factor
+	 * @param maxIterations the maximum number of iterations
+	 * @param numVertices the number of vertices in the input
+	 */
+	public PageRank(double beta, long numVertices, int maxIterations) {
+		this.beta = beta;
+		this.maxIterations = maxIterations;
+		this.numberOfVertices = numVertices;
+	}
+
+	@Override
+	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
+
+		if (numberOfVertices == 0) {
+			numberOfVertices = network.numberOfVertices();
+		}
+
+		DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
+
+		Graph<K, Double, Double> networkWithWeights = network
+				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
+
+		return networkWithWeights.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
+				new RankMessenger<K>(numberOfVertices), maxIterations)
+				.getVertices();
+	}
+
+	/**
+	 * Function that updates the rank of a vertex by summing up the partial
+	 * ranks from all incoming messages and then applying the dampening formula.
+	 */
+	@SuppressWarnings("serial")
+	public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
+
+		private final double beta;
+		private final long numVertices;
+		
+		public VertexRankUpdater(double beta, long numberOfVertices) {
+			this.beta = beta;
+			this.numVertices = numberOfVertices;
+		}
+
+		@Override
+		public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) {
+			double rankSum = 0.0;
+			for (double msg : inMessages) {
+				rankSum += msg;
+			}
+
+			// apply the dampening factor / random jump
+			double newRank = (beta * rankSum) + (1 - beta) / numVertices;
+			setNewVertexValue(newRank);
+		}
+	}
+
+	/**
+	 * Distributes the rank of a vertex among all target vertices according to
+	 * the transition probability, which is associated with an edge as the edge
+	 * value.
+	 */
+	@SuppressWarnings("serial")
+	public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
+
+		private final long numVertices;
+
+		public RankMessenger(long numberOfVertices) {
+			this.numVertices = numberOfVertices;
+		}
+
+		@Override
+		public void sendMessages(Vertex<K, Double> vertex) {
+			if (getSuperstepNumber() == 1) {
+				// initialize vertex ranks
+				vertex.setValue(new Double(1.0 / numVertices));
+			}
+
+			for (Edge<K, Double> edge : getEdges()) {
+				sendMessageTo(edge.getTarget(), vertex.getValue() * edge.getValue());
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
+		public Double map(Tuple2<Double, Long> value) {
+			return value.f0 / value.f1;
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
new file mode 100644
index 0000000..60c4c17
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.library;
+
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.spargel.MessageIterator;
+import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.VertexUpdateFunction;
+
+/**
+ * This is an implementation of the Single-Source-Shortest Paths algorithm, using a vertex-centric iteration.
+ */
+@SuppressWarnings("serial")
+public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
+
+	private final K srcVertexId;
+	private final Integer maxIterations;
+
+	public SingleSourceShortestPaths(K srcVertexId, Integer maxIterations) {
+		this.srcVertexId = srcVertexId;
+		this.maxIterations = maxIterations;
+	}
+
+	@Override
+	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
+
+		return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
+				.runVertexCentricIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
+				maxIterations).getVertices();
+	}
+
+	public static final class InitVerticesMapper<K>	implements MapFunction<Vertex<K, Double>, Double> {
+
+		private K srcVertexId;
+
+		public InitVerticesMapper(K srcId) {
+			this.srcVertexId = srcId;
+		}
+
+		public Double map(Vertex<K, Double> value) {
+			if (value.f0.equals(srcVertexId)) {
+				return 0.0;
+			} else {
+				return Double.MAX_VALUE;
+			}
+		}
+	}
+
+	/**
+	 * Function that updates the value of a vertex by picking the minimum
+	 * distance from all incoming messages.
+	 * 
+	 * @param <K>
+	 */
+	public static final class VertexDistanceUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
+
+		@Override
+		public void updateVertex(Vertex<K, Double> vertex,
+				MessageIterator<Double> inMessages) {
+
+			Double minDistance = Double.MAX_VALUE;
+
+			for (double msg : inMessages) {
+				if (msg < minDistance) {
+					minDistance = msg;
+				}
+			}
+
+			if (vertex.getValue() > minDistance) {
+				setNewVertexValue(minDistance);
+			}
+		}
+	}
+
+	/**
+	 * Distributes the minimum distance associated with a given vertex among all
+	 * the target vertices summed up with the edge's value.
+	 * 
+	 * @param <K>
+	 */
+	public static final class MinDistanceMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
+
+		@Override
+		public void sendMessages(Vertex<K, Double> vertex)
+				throws Exception {
+			for (Edge<K, Double> edge : getEdges()) {
+				sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
+			}
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
new file mode 100644
index 0000000..d6fdc8a
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import java.util.Iterator;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+/**
+ * An iterator that returns messages. The iterator is {@link java.lang.Iterable} at the same time to support
+ * the <i>foreach</i> syntax.
+ */
+public final class MessageIterator<Message> implements Iterator<Message>, Iterable<Message>, java.io.Serializable {
+	private static final long serialVersionUID = 1L;
+
+	private transient Iterator<Tuple2<?, Message>> source;
+	
+	
+	final void setSource(Iterator<Tuple2<?, Message>> source) {
+		this.source = source;
+	}
+	
+	@Override
+	public final boolean hasNext() {
+		return this.source.hasNext();
+	}
+	
+	@Override
+	public final Message next() {
+		return this.source.next().f1;
+	}
+
+	@Override
+	public final void remove() {
+		throw new UnsupportedOperationException();
+	}
+
+	@Override
+	public Iterator<Message> iterator() {
+		return this;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
new file mode 100644
index 0000000..4245c24
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.spargel;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+
+import org.apache.flink.api.common.aggregators.Aggregator;
+import org.apache.flink.api.common.functions.IterationRuntimeContext;
+import org.apache.flink.api.java.tuple.Tuple;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeDirection;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.Value;
+import org.apache.flink.util.Collector;
+
+/**
+ * The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}.
+ * 
+ * @param <K> The type of the vertex key (the vertex identifier).
+ * @param <VV> The type of the vertex value (the state of the vertex).
+ * @param <Message> The type of the message sent between vertices along the edges.
+ * @param <EV> The type of the values that are associated with the edges.
+ */
+public abstract class MessagingFunction<K, VV, Message, EV> implements Serializable {
+
+	private static final long serialVersionUID = 1L;
+
+	// --------------------------------------------------------------------------------------------
+	//  Attributes that allow vertices to access their in/out degrees and the total number of vertices
+	//  inside an iteration.
+	// --------------------------------------------------------------------------------------------
+
+	private long numberOfVertices = -1L;
+
+	/**
+	 * Retrieves the number of vertices in the graph.
+	 * @return the number of vertices if the {@link IterationConfiguration#setOptNumVertices(boolean)}
+	 * option has been set; -1 otherwise.
+	 */
+	public long getNumberOfVertices() {
+		return numberOfVertices;
+	}
+
+	void setNumberOfVertices(long numberOfVertices) {
+		this.numberOfVertices = numberOfVertices;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Attribute that allows the user to choose the neighborhood type(in/out/all) on which to run
+	//  the vertex centric iteration.
+	// --------------------------------------------------------------------------------------------
+
+	private EdgeDirection direction;
+
+	/**
+	 * Retrieves the edge direction in which messages are propagated in the vertex-centric iteration.
+	 * @return the messaging {@link EdgeDirection}
+	 */
+	public EdgeDirection getDirection() {
+		return direction;
+	}
+
+	void setDirection(EdgeDirection direction) {
+		this.direction = direction;
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  Public API Methods
+	// --------------------------------------------------------------------------------------------
+
+	/**
+	 * This method is invoked once per superstep for each vertex that was changed in that superstep.
+	 * It needs to produce the messages that will be received by vertices in the next superstep.
+	 * 
+	 * @param vertex The vertex that was changed.
+	 * 
+	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
+	 */
+	public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception;
+	
+	/**
+	 * This method is executed one per superstep before the vertex update function is invoked for each vertex.
+	 * 
+	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
+	 */
+	public void preSuperstep() throws Exception {}
+	
+	/**
+	 * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
+	 * 
+	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
+	 */
+	public void postSuperstep() throws Exception {}
+	
+	
+	/**
+	 * Gets an {@link java.lang.Iterable} with all edges. This method is mutually exclusive with
+	 * {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
+	 * 
+	 * @return An iterator with all outgoing edges.
+	 */
+	@SuppressWarnings("unchecked")
+	public Iterable<Edge<K, EV>> getEdges() {
+		if (edgesUsed) {
+			throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once.");
+		}
+		edgesUsed = true;
+		this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
+		return this.edgeIterator;
+	}
+
+	/**
+	 * Sends the given message to all vertices that are targets of an outgoing edge of the changed vertex.
+	 * This method is mutually exclusive to the method {@link #getEdges()} and may be called only once.
+	 * 
+	 * @param m The message to send.
+	 */
+	public void sendMessageToAllNeighbors(Message m) {
+		if (edgesUsed) {
+			throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once.");
+		}
+		
+		edgesUsed = true;
+		
+		outValue.f1 = m;
+		
+		while (edges.hasNext()) {
+			Tuple next = (Tuple) edges.next();
+			K k = next.getField(1);
+			outValue.f0 = k;
+			out.collect(outValue);
+		}
+	}
+	
+	/**
+	 * Sends the given message to the vertex identified by the given key. If the target vertex does not exist,
+	 * the next superstep will cause an exception due to a non-deliverable message.
+	 * 
+	 * @param target The key (id) of the target vertex to message.
+	 * @param m The message.
+	 */
+	public void sendMessageTo(K target, Message m) {
+		outValue.f0 = target;
+		outValue.f1 = m;
+		out.collect(outValue);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	
+	/**
+	 * Gets the number of the superstep, starting at <tt>1</tt>.
+	 * 
+	 * @return The number of the current superstep.
+	 */
+	public int getSuperstepNumber() {
+		return this.runtimeContext.getSuperstepNumber();
+	}
+	
+	/**
+	 * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
+	 * all aggregates globally once per superstep and makes them available in the next superstep.
+	 * 
+	 * @param name The name of the aggregator.
+	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
+	 */
+	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
+		return this.runtimeContext.<T>getIterationAggregator(name);
+	}
+	
+	/**
+	 * Get the aggregated value that an aggregator computed in the previous iteration.
+	 * 
+	 * @param name The name of the aggregator.
+	 * @return The aggregated value of the previous iteration.
+	 */
+	public <T extends Value> T getPreviousIterationAggregate(String name) {
+		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
+	}
+	
+	/**
+	 * Gets the broadcast data set registered under the given name. Broadcast data sets
+	 * are available on all parallel instances of a function. They can be registered via
+	 * {@link org.apache.flink.graph.spargel.VertexCentricConfiguration#addBroadcastSetForMessagingFunction(String, org.apache.flink.api.java.DataSet)}.
+	 * 
+	 * @param name The name under which the broadcast set is registered.
+	 * @return The broadcast data set.
+	 */
+	public <T> Collection<T> getBroadcastSet(String name) {
+		return this.runtimeContext.<T>getBroadcastVariable(name);
+	}
+
+	// --------------------------------------------------------------------------------------------
+	//  internal methods and state
+	// --------------------------------------------------------------------------------------------
+	
+	private Tuple2<K, Message> outValue;
+	
+	private IterationRuntimeContext runtimeContext;
+	
+	private Iterator<?> edges;
+	
+	private Collector<Tuple2<K, Message>> out;
+	
+	private EdgesIterator<K, EV> edgeIterator;
+	
+	private boolean edgesUsed;
+
+	private long inDegree = -1;
+
+	private long outDegree = -1;
+	
+	void init(IterationRuntimeContext context) {
+		this.runtimeContext = context;
+		this.outValue = new Tuple2<K, Message>();
+		this.edgeIterator = new EdgesIterator<K, EV>();
+	}
+	
+	void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out) {
+		this.edges = edges;
+		this.out = out;
+		this.edgesUsed = false;
+	}
+	
+	private static final class EdgesIterator<K, EV> 
+		implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>>
+	{
+		private Iterator<Edge<K, EV>> input;
+		
+		private Edge<K, EV> edge = new Edge<K, EV>();
+		
+		void set(Iterator<Edge<K, EV>> input) {
+			this.input = input;
+		}
+		
+		@Override
+		public boolean hasNext() {
+			return input.hasNext();
+		}
+
+		@Override
+		public Edge<K, EV> next() {
+			Edge<K, EV> next = input.next();
+			edge.setSource(next.f0);
+			edge.setTarget(next.f1);
+			edge.setValue(next.f2);
+			return edge;
+		}
+
+		@Override
+		public void remove() {
+			throw new UnsupportedOperationException();
+		}
+		@Override
+		public Iterator<Edge<K, EV>> iterator() {
+			return this;
+		}
+	}
+
+	/**
+	 * Retrieves the vertex in-degree (number of in-coming edges).
+	 * @return The in-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
+	 * option has been set; -1 otherwise. 
+	 */
+	public long getInDegree() {
+		return inDegree;
+	}
+
+	void setInDegree(long inDegree) {
+		this.inDegree = inDegree;
+	}
+
+	/**
+	 * Retrieve the vertex out-degree (number of out-going edges).
+	 * @return The out-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
+	 * option has been set; -1 otherwise. 
+	 */
+	public long getOutDegree() {
+		return outDegree;
+	}
+
+	void setOutDegree(long outDegree) {
+		this.outDegree = outDegree;
+	}
+}