You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:45 UTC

[07/24] flink git commit: [FLINK-2833] [gelly] create a flink-libraries module and move gelly there

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
deleted file mode 100755
index 23ccb68..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ /dev/null
@@ -1,425 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.gsa;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.RichFlatJoinFunction;
-import org.apache.flink.api.common.functions.RichMapFunction;
-import org.apache.flink.api.common.functions.RichReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
-import org.apache.flink.api.java.operators.CustomUnaryOperation;
-import org.apache.flink.api.java.operators.DeltaIteration;
-import org.apache.flink.api.java.operators.JoinOperator;
-import org.apache.flink.api.java.operators.MapOperator;
-import org.apache.flink.api.java.operators.ReduceOperator;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.util.Collector;
-import java.util.Map;
-
-import com.google.common.base.Preconditions;
-
-/**
- * This class represents iterative graph computations, programmed in a gather-sum-apply perspective.
- *
- * @param <K> The type of the vertex key in the graph
- * @param <VV> The type of the vertex value in the graph
- * @param <EV> The type of the edge value in the graph
- * @param <M> The intermediate type used by the gather, sum and apply functions
- */
-public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperation<Vertex<K, VV>,
-		Vertex<K, VV>> {
-
-	private DataSet<Vertex<K, VV>> vertexDataSet;
-	private DataSet<Edge<K, EV>> edgeDataSet;
-
-	private final GatherFunction<VV, EV, M> gather;
-	private final SumFunction<VV, EV, M> sum;
-	private final ApplyFunction<K, VV, M> apply;
-	private final int maximumNumberOfIterations;
-	private EdgeDirection direction = EdgeDirection.OUT;
-
-	private GSAConfiguration configuration;
-
-	// ----------------------------------------------------------------------------------
-
-	private GatherSumApplyIteration(GatherFunction<VV, EV, M> gather, SumFunction<VV, EV, M> sum,
-			ApplyFunction<K, VV, M> apply, DataSet<Edge<K, EV>> edges, int maximumNumberOfIterations) {
-
-		Preconditions.checkNotNull(gather);
-		Preconditions.checkNotNull(sum);
-		Preconditions.checkNotNull(apply);
-		Preconditions.checkNotNull(edges);
-		Preconditions.checkArgument(maximumNumberOfIterations > 0, "The maximum number of iterations must be at least one.");
-
-		this.gather = gather;
-		this.sum = sum;
-		this.apply = apply;
-		this.edgeDataSet = edges;
-		this.maximumNumberOfIterations = maximumNumberOfIterations;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Custom Operator behavior
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * Sets the input data set for this operator. In the case of this operator this input data set represents
-	 * the set of vertices with their initial state.
-	 *
-	 * @param dataSet The input data set, which in the case of this operator represents the set of
-	 *                vertices with their initial state.
-	 */
-	@Override
-	public void setInput(DataSet<Vertex<K, VV>> dataSet) {
-		this.vertexDataSet = dataSet;
-	}
-
-	/**
-	 * Computes the results of the gather-sum-apply iteration
-	 *
-	 * @return The resulting DataSet
-	 */
-	@Override
-	public DataSet<Vertex<K, VV>> createResult() {
-		if (vertexDataSet == null) {
-			throw new IllegalStateException("The input data set has not been set.");
-		}
-
-		// Prepare type information
-		TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertexDataSet.getType()).getTypeAt(0);
-		TypeInformation<M> messageType = TypeExtractor.createTypeInfo(GatherFunction.class, gather.getClass(), 2, null, null);
-		TypeInformation<Tuple2<K, M>> innerType = new TupleTypeInfo<Tuple2<K, M>>(keyType, messageType);
-		TypeInformation<Vertex<K, VV>> outputType = vertexDataSet.getType();
-
-		// create a graph
-		Graph<K, VV, EV> graph =
-				Graph.fromDataSet(vertexDataSet, edgeDataSet, vertexDataSet.getExecutionEnvironment());
-
-		// check whether the numVertices option is set and, if so, compute the total number of vertices
-		// and set it within the gather, sum and apply functions
-		if (this.configuration != null && this.configuration.isOptNumVertices()) {
-			try {
-				long numberOfVertices = graph.numberOfVertices();
-				gather.setNumberOfVertices(numberOfVertices);
-				sum.setNumberOfVertices(numberOfVertices);
-				apply.setNumberOfVertices(numberOfVertices);
-			} catch (Exception e) {
-				e.printStackTrace();
-			}
-		}
-
-		// Prepare UDFs
-		GatherUdf<K, VV, EV, M> gatherUdf = new GatherUdf<K, VV, EV, M>(gather, innerType);
-		SumUdf<K, VV, EV, M> sumUdf = new SumUdf<K, VV, EV, M>(sum, innerType);
-		ApplyUdf<K, VV, EV, M> applyUdf = new ApplyUdf<K, VV, EV, M>(apply, outputType);
-
-		final int[] zeroKeyPos = new int[] {0};
-		final DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration =
-				vertexDataSet.iterateDelta(vertexDataSet, maximumNumberOfIterations, zeroKeyPos);
-
-		// set up the iteration operator
-		if (this.configuration != null) {
-
-			iteration.name(this.configuration.getName(
-					"Gather-sum-apply iteration (" + gather + " | " + sum + " | " + apply + ")"));
-			iteration.parallelism(this.configuration.getParallelism());
-			iteration.setSolutionSetUnManaged(this.configuration.isSolutionSetUnmanagedMemory());
-
-			// register all aggregators
-			for (Map.Entry<String, Aggregator<?>> entry : this.configuration.getAggregators().entrySet()) {
-				iteration.registerAggregator(entry.getKey(), entry.getValue());
-			}
-		}
-		else {
-			// no configuration provided; set default name
-			iteration.name("Gather-sum-apply iteration (" + gather + " | " + sum + " | " + apply + ")");
-		}
-
-		// Prepare the neighbors
-		if(this.configuration != null) {
-			direction = this.configuration.getDirection();
-		}
-		DataSet<Tuple2<K, Neighbor<VV, EV>>> neighbors;
-		switch(direction) {
-			case OUT:
-				neighbors = iteration
-				.getWorkset().join(edgeDataSet)
-				.where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>());
-				break;
-			case IN:
-				neighbors = iteration
-				.getWorkset().join(edgeDataSet)
-				.where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>());
-				break;
-			case ALL:
-				neighbors =  iteration
-						.getWorkset().join(edgeDataSet)
-						.where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>()).union(iteration
-								.getWorkset().join(edgeDataSet)
-								.where(0).equalTo(1).with(new ProjectKeyWithNeighborIN<K, VV, EV>()));
-				break;
-			default:
-				neighbors = iteration
-						.getWorkset().join(edgeDataSet)
-						.where(0).equalTo(0).with(new ProjectKeyWithNeighborOUT<K, VV, EV>());
-				break;
-		}
-
-		// Gather, sum and apply
-		MapOperator<Tuple2<K, Neighbor<VV, EV>>, Tuple2<K, M>> gatherMapOperator = neighbors.map(gatherUdf);
-
-		// configure map gather function with name and broadcast variables
-		gatherMapOperator = gatherMapOperator.name("Gather");
-
-		if (this.configuration != null) {
-			for (Tuple2<String, DataSet<?>> e : this.configuration.getGatherBcastVars()) {
-				gatherMapOperator = gatherMapOperator.withBroadcastSet(e.f1, e.f0);
-			}
-		}
-		DataSet<Tuple2<K, M>> gatheredSet = gatherMapOperator;
-
-		ReduceOperator<Tuple2<K, M>> sumReduceOperator = gatheredSet.groupBy(0).reduce(sumUdf);
-
-		// configure reduce sum function with name and broadcast variables
-		sumReduceOperator = sumReduceOperator.name("Sum");
-
-		if (this.configuration != null) {
-			for (Tuple2<String, DataSet<?>> e : this.configuration.getSumBcastVars()) {
-				sumReduceOperator = sumReduceOperator.withBroadcastSet(e.f1, e.f0);
-			}
-		}
-		DataSet<Tuple2<K, M>> summedSet = sumReduceOperator;
-
-		JoinOperator<?, ?, Vertex<K, VV>> appliedSet = summedSet
-				.join(iteration.getSolutionSet())
-				.where(0)
-				.equalTo(0)
-				.with(applyUdf);
-
-		// configure join apply function with name and broadcast variables
-		appliedSet = appliedSet.name("Apply");
-
-		if (this.configuration != null) {
-			for (Tuple2<String, DataSet<?>> e : this.configuration.getApplyBcastVars()) {
-				appliedSet = appliedSet.withBroadcastSet(e.f1, e.f0);
-			}
-		}
-
-		// let the operator know that we preserve the key field
-		appliedSet.withForwardedFieldsFirst("0").withForwardedFieldsSecond("0");
-
-		return iteration.closeWith(appliedSet, appliedSet);
-	}
-
-	/**
-	 * Creates a new gather-sum-apply iteration operator for graphs
-	 *
-	 * @param edges The edge DataSet
-	 *
-	 * @param gather The gather function of the GSA iteration
-	 * @param sum The sum function of the GSA iteration
-	 * @param apply The apply function of the GSA iteration
-	 *
-	 * @param maximumNumberOfIterations The maximum number of iterations executed
-	 *
-	 * @param <K> The type of the vertex key in the graph
-	 * @param <VV> The type of the vertex value in the graph
-	 * @param <EV> The type of the edge value in the graph
-	 * @param <M> The intermediate type used by the gather, sum and apply functions
-	 *
-	 * @return An in stance of the gather-sum-apply graph computation operator.
-	 */
-	public static final <K, VV, EV, M> GatherSumApplyIteration<K, VV, EV, M>
-		withEdges(DataSet<Edge<K, EV>> edges, GatherFunction<VV, EV, M> gather,
-		SumFunction<VV, EV, M> sum, ApplyFunction<K, VV, M> apply, int maximumNumberOfIterations) {
-
-		return new GatherSumApplyIteration<K, VV, EV, M>(gather, sum, apply, edges, maximumNumberOfIterations);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Wrapping UDFs
-	// --------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("serial")
-	@ForwardedFields("f0")
-	private static final class GatherUdf<K, VV, EV, M> extends RichMapFunction<Tuple2<K, Neighbor<VV, EV>>,
-			Tuple2<K, M>> implements ResultTypeQueryable<Tuple2<K, M>> {
-
-		private final GatherFunction<VV, EV, M> gatherFunction;
-		private transient TypeInformation<Tuple2<K, M>> resultType;
-
-		private GatherUdf(GatherFunction<VV, EV, M> gatherFunction, TypeInformation<Tuple2<K, M>> resultType) {
-			this.gatherFunction = gatherFunction;
-			this.resultType = resultType;
-		}
-
-		@Override
-		public Tuple2<K, M> map(Tuple2<K, Neighbor<VV, EV>> neighborTuple) {
-			M result = this.gatherFunction.gather(neighborTuple.f1);
-			return new Tuple2<K, M>(neighborTuple.f0, result);
-		}
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.gatherFunction.init(getIterationRuntimeContext());
-			}
-			this.gatherFunction.preSuperstep();
-		}
-
-		@Override
-		public void close() throws Exception {
-			this.gatherFunction.postSuperstep();
-		}
-
-		@Override
-		public TypeInformation<Tuple2<K, M>> getProducedType() {
-			return this.resultType;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumUdf<K, VV, EV, M> extends RichReduceFunction<Tuple2<K, M>>
-			implements ResultTypeQueryable<Tuple2<K, M>>{
-
-		private final SumFunction<VV, EV, M> sumFunction;
-		private transient TypeInformation<Tuple2<K, M>> resultType;
-
-		private SumUdf(SumFunction<VV, EV, M> sumFunction, TypeInformation<Tuple2<K, M>> resultType) {
-			this.sumFunction = sumFunction;
-			this.resultType = resultType;
-		}
-
-		@Override
-		public Tuple2<K, M> reduce(Tuple2<K, M> arg0, Tuple2<K, M> arg1) throws Exception {
-			K key = arg0.f0;
-			M result = this.sumFunction.sum(arg0.f1, arg1.f1);
-			return new Tuple2<K, M>(key, result);
-		}
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.sumFunction.init(getIterationRuntimeContext());
-			}
-			this.sumFunction.preSuperstep();
-		}
-
-		@Override
-		public void close() throws Exception {
-			this.sumFunction.postSuperstep();
-		}
-
-		@Override
-		public TypeInformation<Tuple2<K, M>> getProducedType() {
-			return this.resultType;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ApplyUdf<K, VV, EV, M> extends RichFlatJoinFunction<Tuple2<K, M>,
-			Vertex<K, VV>, Vertex<K, VV>> implements ResultTypeQueryable<Vertex<K, VV>> {
-
-		private final ApplyFunction<K, VV, M> applyFunction;
-		private transient TypeInformation<Vertex<K, VV>> resultType;
-
-		private ApplyUdf(ApplyFunction<K, VV, M> applyFunction, TypeInformation<Vertex<K, VV>> resultType) {
-			this.applyFunction = applyFunction;
-			this.resultType = resultType;
-		}
-
-		@Override
-		public void join(Tuple2<K, M> newValue, final Vertex<K, VV> currentValue, final Collector<Vertex<K, VV>> out) throws Exception {
-
-			this.applyFunction.setOutput(currentValue, out);
-			this.applyFunction.apply(newValue.f1, currentValue.getValue());
-		}
-
-		@Override
-		public void open(Configuration parameters) throws Exception {
-			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
-				this.applyFunction.init(getIterationRuntimeContext());
-			}
-			this.applyFunction.preSuperstep();
-		}
-
-		@Override
-		public void close() throws Exception {
-			this.applyFunction.postSuperstep();
-		}
-
-		@Override
-		public TypeInformation<Vertex<K, VV>> getProducedType() {
-			return this.resultType;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	@ForwardedFieldsSecond("f1->f0")
-	private static final class ProjectKeyWithNeighborOUT<K, VV, EV> implements FlatJoinFunction<
-			Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
-
-		public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
-			out.collect(new Tuple2<K, Neighbor<VV, EV>>(
-					edge.getTarget(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
-		}
-	}
-
-	@SuppressWarnings("serial")
-	@ForwardedFieldsSecond({"f0"})
-	private static final class ProjectKeyWithNeighborIN<K, VV, EV> implements FlatJoinFunction<
-			Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Neighbor<VV, EV>>> {
-
-		public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple2<K, Neighbor<VV, EV>>> out) {
-			out.collect(new Tuple2<K, Neighbor<VV, EV>>(
-					edge.getSource(), new Neighbor<VV, EV>(vertex.getValue(), edge.getValue())));
-		}
-	}
-
-
-
-
-	/**
-	 * Configures this gather-sum-apply iteration with the provided parameters.
-	 *
-	 * @param parameters the configuration parameters
-	 */
-	public void configure(GSAConfiguration parameters) {
-		this.configuration = parameters;
-	}
-
-	/**
-	 * @return the configuration parameters of this gather-sum-apply iteration
-	 */
-	public GSAConfiguration getIterationConfiguration() {
-		return this.configuration;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
deleted file mode 100755
index 7fa1ed2..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/Neighbor.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.gsa;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/**
- * This class represents a <sourceVertex, edge> pair
- * This is a wrapper around Tuple2<VV, EV> for convenience in the GatherFunction
- * @param <VV> the vertex value type
- * @param <EV> the edge value type
- */
-@SuppressWarnings("serial")
-public class Neighbor<VV, EV> extends Tuple2<VV, EV> {
-
-	public Neighbor() {}
-
-	public Neighbor(VV neighborValue, EV edgeValue) {
-		super(neighborValue, edgeValue);
-	}
-
-	public VV getNeighborValue() {
-		return this.f0;
-	}
-
-	public EV getEdgeValue() {
-		return this.f1;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
deleted file mode 100755
index f27e275..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/gsa/SumFunction.java
+++ /dev/null
@@ -1,134 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.gsa;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.types.Value;
-
-import java.io.Serializable;
-import java.util.Collection;
-
-/**
- * The base class for the second step of a {@link GatherSumApplyIteration}.
- *
- * @param <VV> the vertex value type
- * @param <EV> the edge value type
- * @param <M> the output type
- */
-@SuppressWarnings("serial")
-public abstract class SumFunction<VV, EV, M> implements Serializable {
-
-	// --------------------------------------------------------------------------------------------
-	//  Attribute that allows access to the total number of vertices inside an iteration.
-	// --------------------------------------------------------------------------------------------
-
-	private long numberOfVertices = -1L;
-
-	/**
-	 * Retrieves the number of vertices in the graph.
-	 * @return the number of vertices if the {@link IterationConfigurationion#setOptNumVertices(boolean)}
-	 * option has been set; -1 otherwise.
-	 */
-	public long getNumberOfVertices() {
-		return numberOfVertices;
-	}
-
-	void setNumberOfVertices(long numberOfVertices) {
-		this.numberOfVertices = numberOfVertices;
-	}
-
-	//---------------------------------------------------------------------------------------------
-	/**
-	 * This method is invoked once per superstep, after the {@link GatherFunction} 
-	 * in a {@link GatherSumApplyIteration}.
-	 * It combines the partial values produced by {@link GatherFunction#gather(Neighbor)}
-	 * in pairs, until a single value has been computed.
-	 * 
-	 * @param arg0 the first partial value.
-	 * @param arg1 the second partial value.
-	 * @return the combined value.
-	 */
-	public abstract M sum(M arg0, M arg1);
-
-	/**
-	 * This method is executed once per superstep before the vertex update function is invoked for each vertex.
-	 *
-	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
-	 */
-	public void preSuperstep() {}
-
-	/**
-	 * This method is executed once per superstep after the vertex update function has been invoked for each vertex.
-	 *
-	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
-	 */
-	public void postSuperstep() {}
-
-	/**
-	 * Gets the number of the superstep, starting at <tt>1</tt>.
-	 *
-	 * @return The number of the current superstep.
-	 */
-	public int getSuperstepNumber() {
-		return this.runtimeContext.getSuperstepNumber();
-	}
-
-	/**
-	 * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
-	 * all aggregates globally once per superstep and makes them available in the next superstep.
-	 *
-	 * @param name The name of the aggregator.
-	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
-	 */
-	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
-	}
-
-	/**
-	 * Get the aggregated value that an aggregator computed in the previous iteration.
-	 *
-	 * @param name The name of the aggregator.
-	 * @return The aggregated value of the previous iteration.
-	 */
-	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
-	}
-
-	/**
-	 * Gets the broadcast data set registered under the given name. Broadcast data sets
-	 * are available on all parallel instances of a function.
-	 *
-	 * @param name The name under which the broadcast set is registered.
-	 * @return The broadcast data set.
-	 */
-	public <T> Collection<T> getBroadcastSet(String name) {
-		return this.runtimeContext.<T>getBroadcastVariable(name);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Internal methods
-	// --------------------------------------------------------------------------------------------
-
-	private IterationRuntimeContext runtimeContext;
-
-	public void init(IterationRuntimeContext iterationRuntimeContext) {
-		this.runtimeContext = iterationRuntimeContext;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
deleted file mode 100644
index 0dd39fc..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/CommunityDetection.java
+++ /dev/null
@@ -1,183 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-
-import java.util.Map;
-import java.util.TreeMap;
-
-/**
- * Community Detection Algorithm.
- *
- * This implementation expects Long Vertex values and labels. The Vertex values of the input Graph provide the initial label assignments.
- * 
- * Initially, each vertex is assigned a tuple formed of its own initial value along with a score equal to 1.0.
- * The vertices propagate their labels and max scores in iterations, each time adopting the label with the
- * highest score from the list of received messages. The chosen label is afterwards re-scored using the fraction
- * delta/the superstep number. Delta is passed as a parameter and has 0.5 as a default value.
- *
- * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
- * is reached.
- * 
- * @param <K> the Vertex ID type 
- *
- * @see <a href="http://arxiv.org/pdf/0808.2633.pdf">article explaining the algorithm in detail</a>
- */
-public class CommunityDetection<K> implements GraphAlgorithm<K, Long, Double, Graph<K, Long, Double>> {
-
-	private Integer maxIterations;
-
-	private Double delta;
-
-	public CommunityDetection(Integer maxIterations, Double delta) {
-
-		this.maxIterations = maxIterations;
-		this.delta = delta;
-	}
-
-	@Override
-	public Graph<K, Long, Double> run(Graph<K, Long, Double> graph) {
-
-		DataSet<Vertex<K, Tuple2<Long, Double>>> initializedVertices = graph.getVertices()
-				.map(new AddScoreToVertexValuesMapper<K>());
-
-		Graph<K, Tuple2<Long, Double>, Double> graphWithScoredVertices =
-				Graph.fromDataSet(initializedVertices, graph.getEdges(), graph.getContext()).getUndirected();
-
-		return graphWithScoredVertices.runVertexCentricIteration(new VertexLabelUpdater<K>(delta),
-				new LabelMessenger<K>(), maxIterations)
-				.mapVertices(new RemoveScoreFromVertexValuesMapper<K>());
-	}
-
-	@SuppressWarnings("serial")
-	public static final class VertexLabelUpdater<K> extends VertexUpdateFunction<
-		K, Tuple2<Long, Double>, Tuple2<Long, Double>> {
-
-		private Double delta;
-
-		public VertexLabelUpdater(Double delta) {
-			this.delta = delta;
-		}
-
-		@Override
-		public void updateVertex(Vertex<K, Tuple2<Long, Double>> vertex,
-								MessageIterator<Tuple2<Long, Double>> inMessages) throws Exception {
-
-			// we would like these two maps to be ordered
-			Map<Long, Double> receivedLabelsWithScores = new TreeMap<Long, Double>();
-			Map<Long, Double> labelsWithHighestScore = new TreeMap<Long, Double>();
-
-			for (Tuple2<Long, Double> message : inMessages) {
-				// split the message into received label and score
-				Long receivedLabel = message.f0;
-				Double receivedScore = message.f1;
-
-				// if the label was received before
-				if (receivedLabelsWithScores.containsKey(receivedLabel)) {
-					Double newScore = receivedScore + receivedLabelsWithScores.get(receivedLabel);
-					receivedLabelsWithScores.put(receivedLabel, newScore);
-				} else {
-					// first time we see the label
-					receivedLabelsWithScores.put(receivedLabel, receivedScore);
-				}
-
-				// store the labels with the highest scores
-				if (labelsWithHighestScore.containsKey(receivedLabel)) {
-					Double currentScore = labelsWithHighestScore.get(receivedLabel);
-					if (currentScore < receivedScore) {
-						// record the highest score
-						labelsWithHighestScore.put(receivedLabel, receivedScore);
-					}
-				} else {
-					// first time we see this label
-					labelsWithHighestScore.put(receivedLabel, receivedScore);
-				}
-			}
-
-			if(receivedLabelsWithScores.size() > 0) {
-				// find the label with the highest score from the ones received
-				Double maxScore = -Double.MAX_VALUE;
-				Long maxScoreLabel = vertex.getValue().f0;
-				for (Long curLabel : receivedLabelsWithScores.keySet()) {
-
-					if (receivedLabelsWithScores.get(curLabel) > maxScore) {
-						maxScore = receivedLabelsWithScores.get(curLabel);
-						maxScoreLabel = curLabel;
-					}
-				}
-
-				// find the highest score of maxScoreLabel
-				Double highestScore = labelsWithHighestScore.get(maxScoreLabel);
-				// re-score the new label
-				if (maxScoreLabel != vertex.getValue().f0) {
-					highestScore -= delta / getSuperstepNumber();
-				}
-				// else delta = 0
-				// update own label
-				setNewVertexValue(new Tuple2<Long, Double>(maxScoreLabel, highestScore));
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	public static final class LabelMessenger<K> extends MessagingFunction<K, Tuple2<Long, Double>,
-			Tuple2<Long, Double>, Double> {
-
-		@Override
-		public void sendMessages(Vertex<K, Tuple2<Long, Double>> vertex) throws Exception {
-
-			for(Edge<K, Double> edge : getEdges()) {
-				sendMessageTo(edge.getTarget(), new Tuple2<Long, Double>(vertex.getValue().f0,
-						vertex.getValue().f1 * edge.getValue()));
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	@ForwardedFields("f0")
-	public static final class AddScoreToVertexValuesMapper<K> implements MapFunction<
-		Vertex<K, Long>, Vertex<K, Tuple2<Long, Double>>> {
-
-		public Vertex<K, Tuple2<Long, Double>> map(Vertex<K, Long> vertex) {
-			return new Vertex<K, Tuple2<Long, Double>>(
-					vertex.getId(), new Tuple2<Long, Double>(vertex.getValue(), 1.0));
-		}
-	}
-
-	@SuppressWarnings("serial")
-	public static final class RemoveScoreFromVertexValuesMapper<K> implements MapFunction<
-		Vertex<K, Tuple2<Long, Double>>, Long> {
-
-		@Override
-		public Long map(Vertex<K, Tuple2<Long, Double>> vertex) throws Exception {
-			return vertex.getValue().f0;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
deleted file mode 100644
index ed853fe..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/ConnectedComponents.java
+++ /dev/null
@@ -1,97 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-import org.apache.flink.graph.utils.NullValueEdgeMapper;
-import org.apache.flink.types.NullValue;
-
-/**
- * A vertex-centric implementation of the Connected Components algorithm.
- *
- * This implementation assumes that the vertices of the input Graph are initialized with unique, Long component IDs.
- * The vertices propagate their current component ID in iterations, each time adopting a new value from the received neighbor IDs,
- * provided that the value is less than the current minimum.
- *
- * The algorithm converges when vertices no longer update their value or when the maximum number of iterations
- * is reached.
- * 
- * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID.
- * 
- * @see {@link org.apache.flink.graph.library.GSAConnectedComponents}
- */
-@SuppressWarnings("serial")
-public class ConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> {
-
-	private Integer maxIterations;
-
-	public ConnectedComponents(Integer maxIterations) {
-		this.maxIterations = maxIterations;
-	}
-
-	@Override
-	public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> graph) throws Exception {
-
-		Graph<K, Long, NullValue> undirectedGraph = graph.mapEdges(new NullValueEdgeMapper<K, EV>())
-				.getUndirected();
-
-		// initialize vertex values and run the Vertex Centric Iteration
-		return undirectedGraph.runVertexCentricIteration(
-				new CCUpdater<K>(), new CCMessenger<K>(), maxIterations)
-				.getVertices();
-	}
-
-	/**
-	 * Updates the value of a vertex by picking the minimum neighbor ID out of all the incoming messages.
-	 */
-	public static final class CCUpdater<K> extends VertexUpdateFunction<K, Long, Long> {
-
-		@Override
-		public void updateVertex(Vertex<K, Long> vertex, MessageIterator<Long> messages) throws Exception {
-			long min = Long.MAX_VALUE;
-
-			for (long msg : messages) {
-				min = Math.min(min, msg);
-			}
-
-			// update vertex value, if new minimum
-			if (min < vertex.getValue()) {
-				setNewVertexValue(min);
-			}
-		}
-	}
-
-	/**
-	 * Distributes the minimum ID associated with a given vertex among all the target vertices.
-	 */
-	public static final class CCMessenger<K> extends MessagingFunction<K, Long, Long, NullValue> {
-
-		@Override
-		public void sendMessages(Vertex<K, Long> vertex) throws Exception {
-			// send current minimum to neighbors
-			sendMessageToAllNeighbors(vertex.getValue());
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
deleted file mode 100755
index 77bc2cf..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAConnectedComponents.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.graph.utils.NullValueEdgeMapper;
-import org.apache.flink.types.NullValue;
-
-/**
- * This is an implementation of the Connected Components algorithm, using a gather-sum-apply iteration.
- * This implementation assumes that the vertices of the input Graph are initialized with unique, Long component IDs.
- * The result is a DataSet of vertices, where the vertex value corresponds to the assigned component ID.
- * 
- * @see {@link org.apache.flink.graph.library.ConnectedComponents}
- */
-public class GSAConnectedComponents<K, EV> implements GraphAlgorithm<K, Long, EV, DataSet<Vertex<K, Long>>> {
-
-	private Integer maxIterations;
-
-	public GSAConnectedComponents(Integer maxIterations) {
-		this.maxIterations = maxIterations;
-	}
-
-	@Override
-	public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> graph) throws Exception {
-
-		Graph<K, Long, NullValue> undirectedGraph = graph.mapEdges(new NullValueEdgeMapper<K, EV>())
-				.getUndirected();
-
-		// initialize vertex values and run the Vertex Centric Iteration
-		return undirectedGraph.runGatherSumApplyIteration(
-				new GatherNeighborIds(), new SelectMinId(), new UpdateComponentId<K>(),
-				maxIterations).getVertices();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Connected Components UDFs
-	// --------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("serial")
-	private static final class GatherNeighborIds extends GatherFunction<Long, NullValue, Long> {
-
-		public Long gather(Neighbor<Long, NullValue> neighbor) {
-			return neighbor.getNeighborValue();
-		}
-	};
-
-	@SuppressWarnings("serial")
-	private static final class SelectMinId extends SumFunction<Long, NullValue, Long> {
-
-		public Long sum(Long newValue, Long currentValue) {
-			return Math.min(newValue, currentValue);
-		}
-	};
-
-	@SuppressWarnings("serial")
-	private static final class UpdateComponentId<K> extends ApplyFunction<K, Long, Long> {
-
-		public void apply(Long summedValue, Long origValue) {
-			if (summedValue < origValue) {
-				setResult(summedValue);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
deleted file mode 100644
index df3e89a..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
+++ /dev/null
@@ -1,135 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-import org.apache.flink.graph.gsa.SumFunction;
-
-/**
- * This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration.
- * The user can define the damping factor and the maximum number of iterations.
- * If the number of vertices of the input graph is known, it should be provided as a parameter
- * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
- * 
- * The implementation assumes that each page has at least one incoming and one outgoing link.
- */
-public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
-
-	private double beta;
-	private int maxIterations;
-	private long numberOfVertices;
-
-	/**
-	 * @param beta the damping factor
-	 * @param maxIterations the maximum number of iterations
-	 */
-	public GSAPageRank(double beta, int maxIterations) {
-		this.beta = beta;
-		this.maxIterations = maxIterations;
-	}
-
-	public GSAPageRank(double beta, long numVertices, int maxIterations) {
-		this.beta = beta;
-		this.numberOfVertices = numVertices;
-		this.maxIterations = maxIterations;
-	}
-
-	@Override
-	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
-
-		if (numberOfVertices == 0) {
-			numberOfVertices = network.numberOfVertices();
-		}
-
-		DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
-
-		Graph<K, Double, Double> networkWithWeights = network
-				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
-
-		return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
-				new UpdateRanks<K>(beta, numberOfVertices), maxIterations)
-				.getVertices();
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Page Rank UDFs
-	// --------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("serial")
-	private static final class GatherRanks extends GatherFunction<Double, Double, Double> {
-
-		long numberOfVertices;
-
-		public GatherRanks(long numberOfVertices) {
-			this.numberOfVertices = numberOfVertices;
-		}
-
-		@Override
-		public Double gather(Neighbor<Double, Double> neighbor) {
-			double neighborRank = neighbor.getNeighborValue();
-
-			if(getSuperstepNumber() == 1) {
-				neighborRank = 1.0 / numberOfVertices;
-			}
-
-			return neighborRank * neighbor.getEdgeValue();
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class SumRanks extends SumFunction<Double, Double, Double> {
-
-		@Override
-		public Double sum(Double newValue, Double currentValue) {
-			return newValue + currentValue;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class UpdateRanks<K> extends ApplyFunction<K, Double, Double> {
-
-		private final double beta;
-		private final long numVertices;
-
-		public UpdateRanks(double beta, long numberOfVertices) {
-			this.beta = beta;
-			this.numVertices = numberOfVertices;
-		}
-
-		@Override
-		public void apply(Double rankSum, Double currentValue) {
-			setResult((1-beta)/numVertices + beta * rankSum);
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
-		public Double map(Tuple2<Double, Long> value) {
-			return value.f0 / value.f1;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
deleted file mode 100755
index 5a76072..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSASingleSourceShortestPaths.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.gsa.Neighbor;
-
-/**
- * This is an implementation of the Single Source Shortest Paths algorithm, using a gather-sum-apply iteration
- */
-public class GSASingleSourceShortestPaths<K> implements
-	GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
-
-	private final K srcVertexId;
-	private final Integer maxIterations;
-
-	public GSASingleSourceShortestPaths(K srcVertexId, Integer maxIterations) {
-		this.srcVertexId = srcVertexId;
-		this.maxIterations = maxIterations;
-	}
-
-	@Override
-	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
-
-		return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
-				.runGatherSumApplyIteration(new CalculateDistances(), new ChooseMinDistance(),
-						new UpdateDistance<K>(), maxIterations)
-						.getVertices();
-	}
-
-	@SuppressWarnings("serial")
-	public static final class InitVerticesMapper<K>	implements MapFunction<Vertex<K, Double>, Double> {
-
-		private K srcVertexId;
-
-		public InitVerticesMapper(K srcId) {
-			this.srcVertexId = srcId;
-		}
-
-		public Double map(Vertex<K, Double> value) {
-			if (value.f0.equals(srcVertexId)) {
-				return 0.0;
-			} else {
-				return Double.MAX_VALUE;
-			}
-		}
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Single Source Shortest Path UDFs
-	// --------------------------------------------------------------------------------------------
-
-	@SuppressWarnings("serial")
-	private static final class CalculateDistances extends GatherFunction<Double, Double, Double> {
-
-		public Double gather(Neighbor<Double, Double> neighbor) {
-			return neighbor.getNeighborValue() + neighbor.getEdgeValue();
-		}
-	};
-
-	@SuppressWarnings("serial")
-	private static final class ChooseMinDistance extends SumFunction<Double, Double, Double> {
-
-		public Double sum(Double newValue, Double currentValue) {
-			return Math.min(newValue, currentValue);
-		}
-	};
-
-	@SuppressWarnings("serial")
-	private static final class UpdateDistance<K> extends ApplyFunction<K, Double, Double> {
-
-		public void apply(Double newDistance, Double oldDistance) {
-			if (newDistance < oldDistance) {
-				setResult(newDistance);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
deleted file mode 100644
index 76d170d..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/GSATriangleCount.java
+++ /dev/null
@@ -1,190 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.ReduceNeighborsFunction;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Triplet;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.types.NullValue;
-
-import java.util.TreeMap;
-
-/**
- * Triangle Count Algorithm.
- *
- * This algorithm operates in three phases. First, vertices select neighbors with id greater than theirs
- * and send messages to them. Each received message is then propagated to neighbors with higher id.
- * Finally, if a node encounters the target id in the list of received messages, it increments the number
- * of triangles found.
- *
- * This implementation is non - iterative.
- *
- * The algorithm takes an undirected, unweighted graph as input and outputs a DataSet
- * which contains a single integer representing the number of triangles.
- */
-public class GSATriangleCount<K extends Comparable<K>, VV, EV> implements
-		GraphAlgorithm<K, VV, EV, DataSet<Integer>> {
-
-	@SuppressWarnings("serial")
-	@Override
-	public DataSet<Integer> run(Graph<K, VV, EV> input) throws Exception {
-
-		ExecutionEnvironment env = input.getContext();
-
-		// order the edges so that src is always higher than trg
-		DataSet<Edge<K, NullValue>> edges = input.getEdges().map(new OrderEdges<K, EV>()).distinct();
-
-		Graph<K, TreeMap<K, Integer>, NullValue> graph = Graph.fromDataSet(edges,
-				new VertexInitializer<K>(), env);
-
-		// select neighbors with ids higher than the current vertex id
-		// Gather: a no-op in this case
-		// Sum: create the set of neighbors
-		DataSet<Tuple2<K, TreeMap<K, Integer>>> higherIdNeighbors =
-				graph.reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
-
-		Graph<K, TreeMap<K, Integer>, NullValue> graphWithReinitializedVertexValues =
-				graph.mapVertices(new VertexInitializerEmptyTreeMap<K>());
-
-		// Apply: attach the computed values to the vertices
-		// joinWithVertices to update the node values
-		DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithHigherIdNeighbors =
-				graphWithReinitializedVertexValues.joinWithVertices(higherIdNeighbors, new AttachValues<K>()).getVertices();
-
-		Graph<K, TreeMap<K,Integer>, NullValue> graphWithNeighbors = Graph.fromDataSet(verticesWithHigherIdNeighbors,
-				edges, env);
-
-		// propagate each received value to neighbors with higher id
-		// Gather: a no-op in this case
-		// Sum: propagate values
-		DataSet<Tuple2<K, TreeMap<K, Integer>>> propagatedValues = graphWithNeighbors
-				.reduceOnNeighbors(new GatherHigherIdNeighbors<K>(), EdgeDirection.IN);
-
-		// Apply: attach propagated values to vertices
-		DataSet<Vertex<K, TreeMap<K, Integer>>> verticesWithPropagatedValues =
-				graphWithReinitializedVertexValues.joinWithVertices(propagatedValues, new AttachValues<K>()).getVertices();
-
-		Graph<K, TreeMap<K, Integer>, NullValue> graphWithPropagatedNeighbors =
-				Graph.fromDataSet(verticesWithPropagatedValues, graphWithNeighbors.getEdges(), env);
-
-		// Scatter: compute the number of triangles
-		DataSet<Integer> numberOfTriangles = graphWithPropagatedNeighbors.getTriplets()
-				.map(new ComputeTriangles<K>()).reduce(new ReduceFunction<Integer>() {
-
-					@Override
-					public Integer reduce(Integer first, Integer second) throws Exception {
-						return first + second;
-					}
-				});
-
-		return numberOfTriangles;
-	}
-
-	@SuppressWarnings("serial")
-	private static final class OrderEdges<K extends Comparable<K>, EV> implements
-		MapFunction<Edge<K, EV>, Edge<K, NullValue>> {
-
-		@Override
-		public Edge<K, NullValue> map(Edge<K, EV> edge) throws Exception {
-			if (edge.getSource().compareTo(edge.getTarget()) < 0) {
-				return new Edge<K, NullValue>(edge.getTarget(), edge.getSource(), NullValue.getInstance());
-			} else {
-				return new Edge<K, NullValue>(edge.getSource(), edge.getTarget(), NullValue.getInstance());
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class VertexInitializer<K> implements MapFunction<K, TreeMap<K, Integer>> {
-
-		@Override
-		public TreeMap<K, Integer> map(K value) throws Exception {
-			TreeMap<K, Integer> neighbors = new TreeMap<K, Integer>();
-			neighbors.put(value, 1);
-
-			return neighbors;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class VertexInitializerEmptyTreeMap<K> implements
-			MapFunction<Vertex<K, TreeMap<K, Integer>>, TreeMap<K, Integer>> {
-
-		@Override
-		public TreeMap<K, Integer> map(Vertex<K, TreeMap<K, Integer>> vertex) throws Exception {
-			return new TreeMap<K, Integer>();
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class AttachValues<K> implements MapFunction<Tuple2<TreeMap<K, Integer>,
-			TreeMap<K, Integer>>, TreeMap<K, Integer>> {
-
-		@Override
-		public TreeMap<K, Integer> map(Tuple2<TreeMap<K, Integer>, TreeMap<K, Integer>> tuple2) throws Exception {
-			return tuple2.f1;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class GatherHigherIdNeighbors<K> implements
-		ReduceNeighborsFunction<TreeMap<K,Integer>> {
-
-		@Override
-		public TreeMap<K, Integer> reduceNeighbors(TreeMap<K,Integer> first, TreeMap<K,Integer> second) {
-			for (K key : second.keySet()) {
-				Integer value = first.get(key);
-				if (value != null) {
-					first.put(key, value + second.get(key));
-				} else {
-					first.put(key, second.get(key));
-				}
-			}
-			return first;
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class ComputeTriangles<K> implements MapFunction<Triplet<K, TreeMap<K, Integer>, NullValue>,
-			Integer> {
-
-		@Override
-		public Integer map(Triplet<K, TreeMap<K, Integer>, NullValue> triplet) throws Exception {
-
-			Vertex<K, TreeMap<K, Integer>> srcVertex = triplet.getSrcVertex();
-			Vertex<K, TreeMap<K, Integer>> trgVertex = triplet.getTrgVertex();
-			int triangles = 0;
-
-			if(trgVertex.getValue().get(srcVertex.getId()) != null) {
-				triangles = trgVertex.getValue().get(srcVertex.getId());
-			}
-			return triangles;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
deleted file mode 100644
index 82dfee7..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/LabelPropagation.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-import org.apache.flink.graph.utils.NullValueEdgeMapper;
-import org.apache.flink.types.NullValue;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Map.Entry;
-
-/**
- * An implementation of the label propagation algorithm. The iterative algorithm
- * detects communities by propagating labels. In each iteration, a vertex adopts
- * the label that is most frequent among its neighbors' labels. Labels are
- * represented by Longs and we assume a total ordering among them, in order to
- * break ties. The algorithm converges when no vertex changes its value or the
- * maximum number of iterations have been reached. Note that different
- * initializations might lead to different results.
- * 
- */
-@SuppressWarnings("serial")
-
-public class LabelPropagation<K extends Comparable<K>, EV> implements GraphAlgorithm<K, Long, EV,
-	DataSet<Vertex<K, Long>>> {
-
-	private final int maxIterations;
-
-	public LabelPropagation(int maxIterations) {
-		this.maxIterations = maxIterations;
-	}
-
-	@Override
-	public DataSet<Vertex<K, Long>> run(Graph<K, Long, EV> input) {
-
-		// iteratively adopt the most frequent label among the neighbors
-		// of each vertex
-		return input.mapEdges(new NullValueEdgeMapper<K, EV>()).runVertexCentricIteration(new UpdateVertexLabel<K>(), new SendNewLabelToNeighbors<K>(),
-				maxIterations).getVertices();
-	}
-
-	/**
-	 * Function that updates the value of a vertex by adopting the most frequent
-	 * label among its in-neighbors
-	 */
-	public static final class UpdateVertexLabel<K> extends VertexUpdateFunction<K, Long, Long> {
-
-		public void updateVertex(Vertex<K, Long> vertex,
-				MessageIterator<Long> inMessages) {
-			Map<Long, Long> labelsWithFrequencies = new HashMap<Long, Long>();
-
-			long maxFrequency = 1;
-			long mostFrequentLabel = vertex.getValue();
-
-			// store the labels with their frequencies
-			for (Long msg : inMessages) {
-				if (labelsWithFrequencies.containsKey(msg)) {
-					long currentFreq = labelsWithFrequencies.get(msg);
-					labelsWithFrequencies.put(msg, currentFreq + 1);
-				} else {
-					labelsWithFrequencies.put(msg, 1L);
-				}
-			}
-			// select the most frequent label: if two or more labels have the
-			// same frequency,
-			// the node adopts the label with the highest value
-			for (Entry<Long, Long> entry : labelsWithFrequencies.entrySet()) {
-				if (entry.getValue() == maxFrequency) {
-					// check the label value to break ties
-					if (entry.getKey() > mostFrequentLabel) {
-						mostFrequentLabel = entry.getKey();
-					}
-				} else if (entry.getValue() > maxFrequency) {
-					maxFrequency = entry.getValue();
-					mostFrequentLabel = entry.getKey();
-				}
-			}
-
-			// set the new vertex value
-			setNewVertexValue(mostFrequentLabel);
-		}
-	}
-
-	/**
-	 * Sends the vertex label to all out-neighbors
-	 */
-	public static final class SendNewLabelToNeighbors<K> extends MessagingFunction<K, Long, Long, NullValue> {
-
-		public void sendMessages(Vertex<K, Long> vertex) {
-			sendMessageToAllNeighbors(vertex.getValue());
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
deleted file mode 100644
index 8193dba..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ /dev/null
@@ -1,146 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-
-/**
- * This is an implementation of a simple PageRank algorithm, using a vertex-centric iteration.
- * The user can define the damping factor and the maximum number of iterations.
- * If the number of vertices of the input graph is known, it should be provided as a parameter
- * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
- * 
- * The implementation assumes that each page has at least one incoming and one outgoing link.
- */
-public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
-
-	private double beta;
-	private int maxIterations;
-	private long numberOfVertices;
-
-	/**
-	 * @param beta the damping factor
-	 * @param maxIterations the maximum number of iterations
-	 */
-	public PageRank(double beta, int maxIterations) {
-		this.beta = beta;
-		this.maxIterations = maxIterations;
-		this.numberOfVertices = 0;
-	}
-
-	/**
-	 * @param beta the damping factor
-	 * @param maxIterations the maximum number of iterations
-	 * @param numVertices the number of vertices in the input
-	 */
-	public PageRank(double beta, long numVertices, int maxIterations) {
-		this.beta = beta;
-		this.maxIterations = maxIterations;
-		this.numberOfVertices = numVertices;
-	}
-
-	@Override
-	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
-
-		if (numberOfVertices == 0) {
-			numberOfVertices = network.numberOfVertices();
-		}
-
-		DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
-
-		Graph<K, Double, Double> networkWithWeights = network
-				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeightsMapper());
-
-		return networkWithWeights.runVertexCentricIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
-				new RankMessenger<K>(numberOfVertices), maxIterations)
-				.getVertices();
-	}
-
-	/**
-	 * Function that updates the rank of a vertex by summing up the partial
-	 * ranks from all incoming messages and then applying the dampening formula.
-	 */
-	@SuppressWarnings("serial")
-	public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
-
-		private final double beta;
-		private final long numVertices;
-		
-		public VertexRankUpdater(double beta, long numberOfVertices) {
-			this.beta = beta;
-			this.numVertices = numberOfVertices;
-		}
-
-		@Override
-		public void updateVertex(Vertex<K, Double> vertex, MessageIterator<Double> inMessages) {
-			double rankSum = 0.0;
-			for (double msg : inMessages) {
-				rankSum += msg;
-			}
-
-			// apply the dampening factor / random jump
-			double newRank = (beta * rankSum) + (1 - beta) / numVertices;
-			setNewVertexValue(newRank);
-		}
-	}
-
-	/**
-	 * Distributes the rank of a vertex among all target vertices according to
-	 * the transition probability, which is associated with an edge as the edge
-	 * value.
-	 */
-	@SuppressWarnings("serial")
-	public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
-
-		private final long numVertices;
-
-		public RankMessenger(long numberOfVertices) {
-			this.numVertices = numberOfVertices;
-		}
-
-		@Override
-		public void sendMessages(Vertex<K, Double> vertex) {
-			if (getSuperstepNumber() == 1) {
-				// initialize vertex ranks
-				vertex.setValue(new Double(1.0 / numVertices));
-			}
-
-			for (Edge<K, Double> edge : getEdges()) {
-				sendMessageTo(edge.getTarget(), vertex.getValue() * edge.getValue());
-			}
-		}
-	}
-
-	@SuppressWarnings("serial")
-	private static final class InitWeightsMapper implements MapFunction<Tuple2<Double, Long>, Double> {
-		public Double map(Tuple2<Double, Long> value) {
-			return value.f0 / value.f1;
-		}
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
deleted file mode 100644
index 60c4c17..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/library/SingleSourceShortestPaths.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.library;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.spargel.MessageIterator;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-
-/**
- * This is an implementation of the Single-Source-Shortest Paths algorithm, using a vertex-centric iteration.
- */
-@SuppressWarnings("serial")
-public class SingleSourceShortestPaths<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
-
-	private final K srcVertexId;
-	private final Integer maxIterations;
-
-	public SingleSourceShortestPaths(K srcVertexId, Integer maxIterations) {
-		this.srcVertexId = srcVertexId;
-		this.maxIterations = maxIterations;
-	}
-
-	@Override
-	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> input) {
-
-		return input.mapVertices(new InitVerticesMapper<K>(srcVertexId))
-				.runVertexCentricIteration(new VertexDistanceUpdater<K>(), new MinDistanceMessenger<K>(),
-				maxIterations).getVertices();
-	}
-
-	public static final class InitVerticesMapper<K>	implements MapFunction<Vertex<K, Double>, Double> {
-
-		private K srcVertexId;
-
-		public InitVerticesMapper(K srcId) {
-			this.srcVertexId = srcId;
-		}
-
-		public Double map(Vertex<K, Double> value) {
-			if (value.f0.equals(srcVertexId)) {
-				return 0.0;
-			} else {
-				return Double.MAX_VALUE;
-			}
-		}
-	}
-
-	/**
-	 * Function that updates the value of a vertex by picking the minimum
-	 * distance from all incoming messages.
-	 * 
-	 * @param <K>
-	 */
-	public static final class VertexDistanceUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
-
-		@Override
-		public void updateVertex(Vertex<K, Double> vertex,
-				MessageIterator<Double> inMessages) {
-
-			Double minDistance = Double.MAX_VALUE;
-
-			for (double msg : inMessages) {
-				if (msg < minDistance) {
-					minDistance = msg;
-				}
-			}
-
-			if (vertex.getValue() > minDistance) {
-				setNewVertexValue(minDistance);
-			}
-		}
-	}
-
-	/**
-	 * Distributes the minimum distance associated with a given vertex among all
-	 * the target vertices summed up with the edge's value.
-	 * 
-	 * @param <K>
-	 */
-	public static final class MinDistanceMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
-
-		@Override
-		public void sendMessages(Vertex<K, Double> vertex)
-				throws Exception {
-			for (Edge<K, Double> edge : getEdges()) {
-				sendMessageTo(edge.getTarget(), vertex.getValue() + edge.getValue());
-			}
-		}
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
deleted file mode 100644
index d6fdc8a..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessageIterator.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.spargel;
-
-import java.util.Iterator;
-
-import org.apache.flink.api.java.tuple.Tuple2;
-
-/**
- * An iterator that returns messages. The iterator is {@link java.lang.Iterable} at the same time to support
- * the <i>foreach</i> syntax.
- */
-public final class MessageIterator<Message> implements Iterator<Message>, Iterable<Message>, java.io.Serializable {
-	private static final long serialVersionUID = 1L;
-
-	private transient Iterator<Tuple2<?, Message>> source;
-	
-	
-	final void setSource(Iterator<Tuple2<?, Message>> source) {
-		this.source = source;
-	}
-	
-	@Override
-	public final boolean hasNext() {
-		return this.source.hasNext();
-	}
-	
-	@Override
-	public final Message next() {
-		return this.source.next().f1;
-	}
-
-	@Override
-	public final void remove() {
-		throw new UnsupportedOperationException();
-	}
-
-	@Override
-	public Iterator<Message> iterator() {
-		return this;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
deleted file mode 100644
index 4245c24..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/spargel/MessagingFunction.java
+++ /dev/null
@@ -1,303 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.spargel;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Iterator;
-
-import org.apache.flink.api.common.aggregators.Aggregator;
-import org.apache.flink.api.common.functions.IterationRuntimeContext;
-import org.apache.flink.api.java.tuple.Tuple;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Edge;
-import org.apache.flink.graph.EdgeDirection;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.types.Value;
-import org.apache.flink.util.Collector;
-
-/**
- * The base class for functions that produce messages between vertices as a part of a {@link VertexCentricIteration}.
- * 
- * @param <K> The type of the vertex key (the vertex identifier).
- * @param <VV> The type of the vertex value (the state of the vertex).
- * @param <Message> The type of the message sent between vertices along the edges.
- * @param <EV> The type of the values that are associated with the edges.
- */
-public abstract class MessagingFunction<K, VV, Message, EV> implements Serializable {
-
-	private static final long serialVersionUID = 1L;
-
-	// --------------------------------------------------------------------------------------------
-	//  Attributes that allow vertices to access their in/out degrees and the total number of vertices
-	//  inside an iteration.
-	// --------------------------------------------------------------------------------------------
-
-	private long numberOfVertices = -1L;
-
-	/**
-	 * Retrieves the number of vertices in the graph.
-	 * @return the number of vertices if the {@link IterationConfiguration#setOptNumVertices(boolean)}
-	 * option has been set; -1 otherwise.
-	 */
-	public long getNumberOfVertices() {
-		return numberOfVertices;
-	}
-
-	void setNumberOfVertices(long numberOfVertices) {
-		this.numberOfVertices = numberOfVertices;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Attribute that allows the user to choose the neighborhood type(in/out/all) on which to run
-	//  the vertex centric iteration.
-	// --------------------------------------------------------------------------------------------
-
-	private EdgeDirection direction;
-
-	/**
-	 * Retrieves the edge direction in which messages are propagated in the vertex-centric iteration.
-	 * @return the messaging {@link EdgeDirection}
-	 */
-	public EdgeDirection getDirection() {
-		return direction;
-	}
-
-	void setDirection(EdgeDirection direction) {
-		this.direction = direction;
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  Public API Methods
-	// --------------------------------------------------------------------------------------------
-
-	/**
-	 * This method is invoked once per superstep for each vertex that was changed in that superstep.
-	 * It needs to produce the messages that will be received by vertices in the next superstep.
-	 * 
-	 * @param vertex The vertex that was changed.
-	 * 
-	 * @throws Exception The computation may throw exceptions, which causes the superstep to fail.
-	 */
-	public abstract void sendMessages(Vertex<K, VV> vertex) throws Exception;
-	
-	/**
-	 * This method is executed one per superstep before the vertex update function is invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the pre-superstep phase cause the superstep to fail.
-	 */
-	public void preSuperstep() throws Exception {}
-	
-	/**
-	 * This method is executed one per superstep after the vertex update function has been invoked for each vertex.
-	 * 
-	 * @throws Exception Exceptions in the post-superstep phase cause the superstep to fail.
-	 */
-	public void postSuperstep() throws Exception {}
-	
-	
-	/**
-	 * Gets an {@link java.lang.Iterable} with all edges. This method is mutually exclusive with
-	 * {@link #sendMessageToAllNeighbors(Object)} and may be called only once.
-	 * 
-	 * @return An iterator with all outgoing edges.
-	 */
-	@SuppressWarnings("unchecked")
-	public Iterable<Edge<K, EV>> getEdges() {
-		if (edgesUsed) {
-			throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once.");
-		}
-		edgesUsed = true;
-		this.edgeIterator.set((Iterator<Edge<K, EV>>) edges);
-		return this.edgeIterator;
-	}
-
-	/**
-	 * Sends the given message to all vertices that are targets of an outgoing edge of the changed vertex.
-	 * This method is mutually exclusive to the method {@link #getEdges()} and may be called only once.
-	 * 
-	 * @param m The message to send.
-	 */
-	public void sendMessageToAllNeighbors(Message m) {
-		if (edgesUsed) {
-			throw new IllegalStateException("Can use either 'getEdges()' or 'sendMessageToAllTargets()' exactly once.");
-		}
-		
-		edgesUsed = true;
-		
-		outValue.f1 = m;
-		
-		while (edges.hasNext()) {
-			Tuple next = (Tuple) edges.next();
-			K k = next.getField(1);
-			outValue.f0 = k;
-			out.collect(outValue);
-		}
-	}
-	
-	/**
-	 * Sends the given message to the vertex identified by the given key. If the target vertex does not exist,
-	 * the next superstep will cause an exception due to a non-deliverable message.
-	 * 
-	 * @param target The key (id) of the target vertex to message.
-	 * @param m The message.
-	 */
-	public void sendMessageTo(K target, Message m) {
-		outValue.f0 = target;
-		outValue.f1 = m;
-		out.collect(outValue);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	
-	/**
-	 * Gets the number of the superstep, starting at <tt>1</tt>.
-	 * 
-	 * @return The number of the current superstep.
-	 */
-	public int getSuperstepNumber() {
-		return this.runtimeContext.getSuperstepNumber();
-	}
-	
-	/**
-	 * Gets the iteration aggregator registered under the given name. The iteration aggregator combines
-	 * all aggregates globally once per superstep and makes them available in the next superstep.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregator registered under this name, or null, if no aggregator was registered.
-	 */
-	public <T extends Aggregator<?>> T getIterationAggregator(String name) {
-		return this.runtimeContext.<T>getIterationAggregator(name);
-	}
-	
-	/**
-	 * Get the aggregated value that an aggregator computed in the previous iteration.
-	 * 
-	 * @param name The name of the aggregator.
-	 * @return The aggregated value of the previous iteration.
-	 */
-	public <T extends Value> T getPreviousIterationAggregate(String name) {
-		return this.runtimeContext.<T>getPreviousIterationAggregate(name);
-	}
-	
-	/**
-	 * Gets the broadcast data set registered under the given name. Broadcast data sets
-	 * are available on all parallel instances of a function. They can be registered via
-	 * {@link org.apache.flink.graph.spargel.VertexCentricConfiguration#addBroadcastSetForMessagingFunction(String, org.apache.flink.api.java.DataSet)}.
-	 * 
-	 * @param name The name under which the broadcast set is registered.
-	 * @return The broadcast data set.
-	 */
-	public <T> Collection<T> getBroadcastSet(String name) {
-		return this.runtimeContext.<T>getBroadcastVariable(name);
-	}
-
-	// --------------------------------------------------------------------------------------------
-	//  internal methods and state
-	// --------------------------------------------------------------------------------------------
-	
-	private Tuple2<K, Message> outValue;
-	
-	private IterationRuntimeContext runtimeContext;
-	
-	private Iterator<?> edges;
-	
-	private Collector<Tuple2<K, Message>> out;
-	
-	private EdgesIterator<K, EV> edgeIterator;
-	
-	private boolean edgesUsed;
-
-	private long inDegree = -1;
-
-	private long outDegree = -1;
-	
-	void init(IterationRuntimeContext context) {
-		this.runtimeContext = context;
-		this.outValue = new Tuple2<K, Message>();
-		this.edgeIterator = new EdgesIterator<K, EV>();
-	}
-	
-	void set(Iterator<?> edges, Collector<Tuple2<K, Message>> out) {
-		this.edges = edges;
-		this.out = out;
-		this.edgesUsed = false;
-	}
-	
-	private static final class EdgesIterator<K, EV> 
-		implements Iterator<Edge<K, EV>>, Iterable<Edge<K, EV>>
-	{
-		private Iterator<Edge<K, EV>> input;
-		
-		private Edge<K, EV> edge = new Edge<K, EV>();
-		
-		void set(Iterator<Edge<K, EV>> input) {
-			this.input = input;
-		}
-		
-		@Override
-		public boolean hasNext() {
-			return input.hasNext();
-		}
-
-		@Override
-		public Edge<K, EV> next() {
-			Edge<K, EV> next = input.next();
-			edge.setSource(next.f0);
-			edge.setTarget(next.f1);
-			edge.setValue(next.f2);
-			return edge;
-		}
-
-		@Override
-		public void remove() {
-			throw new UnsupportedOperationException();
-		}
-		@Override
-		public Iterator<Edge<K, EV>> iterator() {
-			return this;
-		}
-	}
-
-	/**
-	 * Retrieves the vertex in-degree (number of in-coming edges).
-	 * @return The in-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
-	 * option has been set; -1 otherwise. 
-	 */
-	public long getInDegree() {
-		return inDegree;
-	}
-
-	void setInDegree(long inDegree) {
-		this.inDegree = inDegree;
-	}
-
-	/**
-	 * Retrieve the vertex out-degree (number of out-going edges).
-	 * @return The out-degree of this vertex if the {@link IterationConfiguration#setOptDegrees(boolean)}
-	 * option has been set; -1 otherwise. 
-	 */
-	public long getOutDegree() {
-		return outDegree;
-	}
-
-	void setOutDegree(long outDegree) {
-		this.outDegree = outDegree;
-	}
-}