You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/10/09 18:05:48 UTC

[10/24] flink git commit: [FLINK-2833] [gelly] create a flink-libraries module and move gelly there

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
deleted file mode 100755
index b24f749..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ /dev/null
@@ -1,1948 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Iterator;
-import java.util.NoSuchElementException;
-import java.util.List;
-import java.util.Arrays;
-
-import org.apache.flink.api.common.functions.CoGroupFunction;
-import org.apache.flink.api.common.functions.FilterFunction;
-import org.apache.flink.api.common.functions.FlatJoinFunction;
-import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
-import org.apache.flink.api.common.functions.ReduceFunction;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.ExecutionEnvironment;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
-import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
-import org.apache.flink.api.java.tuple.Tuple1;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.tuple.Tuple3;
-import org.apache.flink.api.java.tuple.Tuple4;
-import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.graph.gsa.ApplyFunction;
-import org.apache.flink.graph.gsa.GSAConfiguration;
-import org.apache.flink.graph.gsa.GatherFunction;
-import org.apache.flink.graph.gsa.GatherSumApplyIteration;
-import org.apache.flink.graph.gsa.SumFunction;
-import org.apache.flink.graph.spargel.MessagingFunction;
-import org.apache.flink.graph.spargel.VertexCentricConfiguration;
-import org.apache.flink.graph.spargel.VertexCentricIteration;
-import org.apache.flink.graph.spargel.VertexUpdateFunction;
-import org.apache.flink.graph.utils.EdgeToTuple3Map;
-import org.apache.flink.graph.utils.Tuple2ToVertexMap;
-import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
-import org.apache.flink.graph.utils.VertexToTuple2Map;
-import org.apache.flink.graph.validation.GraphValidator;
-import org.apache.flink.util.Collector;
-import org.apache.flink.types.NullValue;
-
-/**
- * Represents a Graph consisting of {@link Edge edges} and {@link Vertex
- * vertices}.
- * 
- * 
- * @see org.apache.flink.graph.Edge
- * @see org.apache.flink.graph.Vertex
- * 
- * @param <K> the key type for edge and vertex identifiers
- * @param <VV> the value type for vertices
- * @param <EV> the value type for edges
- */
-@SuppressWarnings("serial")
-public class Graph<K, VV, EV> {
-
-	private final ExecutionEnvironment context;
-	private final DataSet<Vertex<K, VV>> vertices;
-	private final DataSet<Edge<K, EV>> edges;
-
-	/**
-	 * Creates a graph from two DataSets: vertices and edges
-	 * 
-	 * @param vertices a DataSet of vertices.
-	 * @param edges a DataSet of edges.
-	 * @param context the flink execution environment.
-	 */
-	private Graph(DataSet<Vertex<K, VV>> vertices, DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
-		this.vertices = vertices;
-		this.edges = edges;
-		this.context = context;
-	}
-
-	/**
-	 * Creates a graph from a Collection of vertices and a Collection of edges.
-	 * 
-	 * @param vertices a Collection of vertices.
-	 * @param edges a Collection of edges.
-	 * @param context the flink execution environment.
-	 * @return the newly created graph.
-	 */
-	public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Vertex<K, VV>> vertices,
-			Collection<Edge<K, EV>> edges, ExecutionEnvironment context) {
-
-		return fromDataSet(context.fromCollection(vertices),
-				context.fromCollection(edges), context);
-	}
-
-	/**
-	 * Creates a graph from a Collection of edges, vertices are induced from the
-	 * edges. Vertices are created automatically and their values are set to
-	 * NullValue.
-	 * 
-	 * @param edges a Collection of vertices.
-	 * @param context the flink execution environment.
-	 * @return the newly created graph.
-	 */
-	public static <K, EV> Graph<K, NullValue, EV> fromCollection(Collection<Edge<K, EV>> edges,
-			ExecutionEnvironment context) {
-
-		return fromDataSet(context.fromCollection(edges), context);
-	}
-
-	/**
-	 * Creates a graph from a Collection of edges, vertices are induced from the
-	 * edges and vertex values are calculated by a mapper function. Vertices are
-	 * created automatically and their values are set by applying the provided
-	 * map function to the vertex ids.
-	 * 
-	 * @param edges a Collection of edges.
-	 * @param mapper the mapper function.
-	 * @param context the flink execution environment.
-	 * @return the newly created graph.
-	 */
-	public static <K, VV, EV> Graph<K, VV, EV> fromCollection(Collection<Edge<K, EV>> edges,
-			final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
-
-		return fromDataSet(context.fromCollection(edges), mapper, context);
-	}
-
-	/**
-	 * Creates a graph from a DataSet of vertices and a DataSet of edges.
-	 * 
-	 * @param vertices a DataSet of vertices.
-	 * @param edges a DataSet of edges.
-	 * @param context the flink execution environment.
-	 * @return the newly created graph.
-	 */
-	public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Vertex<K, VV>> vertices,
-			DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
-
-		return new Graph<K, VV, EV>(vertices, edges, context);
-	}
-
-	/**
-	 * Creates a graph from a DataSet of edges, vertices are induced from the
-	 * edges. Vertices are created automatically and their values are set to
-	 * NullValue.
-	 * 
-	 * @param edges a DataSet of edges.
-	 * @param context the flink execution environment.
-	 * @return the newly created graph.
-	 */
-	public static <K, EV> Graph<K, NullValue, EV> fromDataSet(
-			DataSet<Edge<K, EV>> edges, ExecutionEnvironment context) {
-
-		DataSet<Vertex<K, NullValue>> vertices = edges.flatMap(new EmitSrcAndTarget<K, EV>()).distinct();
-
-		return new Graph<K, NullValue, EV>(vertices, edges, context);
-	}
-
-	private static final class EmitSrcAndTarget<K, EV> implements FlatMapFunction<
-		Edge<K, EV>, Vertex<K, NullValue>> {
-
-		public void flatMap(Edge<K, EV> edge, Collector<Vertex<K, NullValue>> out) {
-			out.collect(new Vertex<K, NullValue>(edge.f0, NullValue.getInstance()));
-			out.collect(new Vertex<K, NullValue>(edge.f1, NullValue.getInstance()));
-		}
-	}
-
-	/**
-	 * Creates a graph from a DataSet of edges, vertices are induced from the
-	 * edges and vertex values are calculated by a mapper function. Vertices are
-	 * created automatically and their values are set by applying the provided
-	 * map function to the vertex ids.
-	 * 
-	 * @param edges a DataSet of edges.
-	 * @param mapper the mapper function.
-	 * @param context the flink execution environment.
-	 * @return the newly created graph.
-	 */
-	public static <K, VV, EV> Graph<K, VV, EV> fromDataSet(DataSet<Edge<K, EV>> edges,
-			final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
-
-		TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
-
-		TypeInformation<VV> valueType = TypeExtractor.createTypeInfo(
-				MapFunction.class, mapper.getClass(), 1, null, null);
-
-		@SuppressWarnings({ "unchecked", "rawtypes" })
-		TypeInformation<Vertex<K, VV>> returnType = (TypeInformation<Vertex<K, VV>>) new TupleTypeInfo(
-				Vertex.class, keyType, valueType);
-
-		DataSet<Vertex<K, VV>> vertices = edges
-				.flatMap(new EmitSrcAndTargetAsTuple1<K, EV>()).distinct()
-				.map(new MapFunction<Tuple1<K>, Vertex<K, VV>>() {
-					public Vertex<K, VV> map(Tuple1<K> value) throws Exception {
-						return new Vertex<K, VV>(value.f0, mapper.map(value.f0));
-					}
-				}).returns(returnType).withForwardedFields("f0");
-
-		return new Graph<K, VV, EV>(vertices, edges, context);
-	}
-
-	private static final class EmitSrcAndTargetAsTuple1<K, EV> implements FlatMapFunction<
-		Edge<K, EV>, Tuple1<K>> {
-
-		public void flatMap(Edge<K, EV> edge, Collector<Tuple1<K>> out) {
-			out.collect(new Tuple1<K>(edge.f0));
-			out.collect(new Tuple1<K>(edge.f1));
-		}
-	}
-
-	/**
-	 * Creates a graph from a DataSet of Tuple objects for vertices and edges.
-	 * 
-	 * Vertices with value are created from Tuple2, Edges with value are created
-	 * from Tuple3.
-	 * 
-	 * @param vertices a DataSet of Tuple2.
-	 * @param edges a DataSet of Tuple3.
-	 * @param context the flink execution environment.
-	 * @return the newly created graph.
-	 */
-	public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple2<K, VV>> vertices,
-			DataSet<Tuple3<K, K, EV>> edges, ExecutionEnvironment context) {
-
-		DataSet<Vertex<K, VV>> vertexDataSet = vertices.map(new Tuple2ToVertexMap<K, VV>());
-		DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
-		return fromDataSet(vertexDataSet, edgeDataSet, context);
-	}
-
-	/**
-	 * Creates a graph from a DataSet of Tuple objects for edges, vertices are
-	 * induced from the edges.
-	 * 
-	 * Edges with value are created from Tuple3. Vertices are created
-	 * automatically and their values are set to NullValue.
-	 * 
-	 * @param edges a DataSet of Tuple3.
-	 * @param context the flink execution environment.
-	 * @return the newly created graph.
-	 */
-	public static <K, EV> Graph<K, NullValue, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges,
-			ExecutionEnvironment context) {
-
-		DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
-		return fromDataSet(edgeDataSet, context);
-	}
-
-	/**
-	 * Creates a graph from a DataSet of Tuple objects for edges, vertices are
-	 * induced from the edges and vertex values are calculated by a mapper
-	 * function. Edges with value are created from Tuple3. Vertices are created
-	 * automatically and their values are set by applying the provided map
-	 * function to the vertex ids.
-	 * 
-	 * @param edges a DataSet of Tuple3.
-	 * @param mapper the mapper function.
-	 * @param context the flink execution environment.
-	 * @return the newly created graph.
-	 */
-	public static <K, VV, EV> Graph<K, VV, EV> fromTupleDataSet(DataSet<Tuple3<K, K, EV>> edges,
-			final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
-
-		DataSet<Edge<K, EV>> edgeDataSet = edges.map(new Tuple3ToEdgeMap<K, EV>());
-		return fromDataSet(edgeDataSet, mapper, context);
-	}
-
-	/**
-	* Creates a Graph from a CSV file of vertices and a CSV file of edges.
-	* 
-	* @param verticesPath path to a CSV file with the Vertex data.
-	* @param edgesPath path to a CSV file with the Edge data
-	* @param context the Flink execution environment.
-	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader}, 
-	* on which calling methods to specify types of the Vertex ID, Vertex value and Edge value returns a Graph.
-	* 
-	* @see {@link org.apache.flink.graph.GraphCsvReader#types(Class, Class, Class)},
-	* {@link org.apache.flink.graph.GraphCsvReader#vertexTypes(Class, Class)},
-	* {@link org.apache.flink.graph.GraphCsvReader#edgeTypes(Class, Class)} and
-	* {@link org.apache.flink.graph.GraphCsvReader#keyType(Class)}.
-	*/
-	public static GraphCsvReader fromCsvReader(String verticesPath, String edgesPath, ExecutionEnvironment context) {
-		return new GraphCsvReader(verticesPath, edgesPath, context);
-	}
-
-	/** 
-	* Creates a graph from a CSV file of edges. Vertices will be created automatically.
-	*
-	* @param edgesPath a path to a CSV file with the Edges data
-	* @param context the execution environment.
-	* @return An instance of {@link org.apache.flink.graph.GraphCsvReader},
-	* on which calling methods to specify types of the Vertex ID, Vertex value and Edge value returns a Graph.
-	* 
-	* @see {@link org.apache.flink.graph.GraphCsvReader#types(Class, Class, Class)},
-	* {@link org.apache.flink.graph.GraphCsvReader#vertexTypes(Class, Class)},
-	* {@link org.apache.flink.graph.GraphCsvReader#edgeTypes(Class, Class)} and
-	* {@link org.apache.flink.graph.GraphCsvReader#keyType(Class)}.
-	*/
-	public static GraphCsvReader fromCsvReader(String edgesPath, ExecutionEnvironment context) {
-		return new GraphCsvReader(edgesPath, context);
-	}
-
-	/** 
-	 * Creates a graph from a CSV file of edges. Vertices will be created automatically and
-	 * Vertex values are set by the provided mapper.
-	 *
-	 * @param edgesPath a path to a CSV file with the Edge data
-	 * @param mapper the mapper function.
-	 * @param context the execution environment.
-	 * @return An instance of {@link org.apache.flink.graph.GraphCsvReader},
-	 * on which calling methods to specify types of the Vertex ID, Vertex Value and Edge value returns a Graph.
-	 * 
-	 * @see {@link org.apache.flink.graph.GraphCsvReader#types(Class, Class, Class)},
-	 * {@link org.apache.flink.graph.GraphCsvReader#vertexTypes(Class, Class)},
-	 * {@link org.apache.flink.graph.GraphCsvReader#edgeTypes(Class, Class)} and
-	 * {@link org.apache.flink.graph.GraphCsvReader#keyType(Class)}.
-	 */
-	public static <K, VV> GraphCsvReader fromCsvReader(String edgesPath,
-			final MapFunction<K, VV> mapper, ExecutionEnvironment context) {
-		return new GraphCsvReader(edgesPath, mapper, context);
-	}
-
-	/**
-	 * @return the flink execution environment.
-	 */
-	public ExecutionEnvironment getContext() {
-		return this.context;
-	}
-
-	/**
-	 * Function that checks whether a Graph is a valid Graph,
-	 * as defined by the given {@link GraphValidator}.
-	 * 
-	 * @return true if the Graph is valid.
-	 */
-	public Boolean validate(GraphValidator<K, VV, EV> validator) throws Exception {
-		return validator.validate(this);
-	}
-
-	/**
-	 * @return the vertex DataSet.
-	 */
-	public DataSet<Vertex<K, VV>> getVertices() {
-		return vertices;
-	}
-
-	/**
-	 * @return the edge DataSet.
-	 */
-	public DataSet<Edge<K, EV>> getEdges() {
-		return edges;
-	}
-
-	/**
-	 * @return the vertex DataSet as Tuple2.
-	 */
-	public DataSet<Tuple2<K, VV>> getVerticesAsTuple2() {
-		return vertices.map(new VertexToTuple2Map<K, VV>());
-	}
-
-	/**
-	 * @return the edge DataSet as Tuple3.
-	 */
-	public DataSet<Tuple3<K, K, EV>> getEdgesAsTuple3() {
-		return edges.map(new EdgeToTuple3Map<K, EV>());
-	}
-
-	/**
-	 * This method allows access to the graph's edge values along with its source and target vertex values.
-	 *
-	 * @return a triplet DataSet consisting of (srcVertexId, trgVertexId, srcVertexValue, trgVertexValue, edgeValue)
-	 */
-	public DataSet<Triplet<K, VV, EV>> getTriplets() {
-		return this.getVertices().join(this.getEdges()).where(0).equalTo(0)
-				.with(new ProjectEdgeWithSrcValue<K, VV, EV>())
-				.join(this.getVertices()).where(1).equalTo(0)
-				.with(new ProjectEdgeWithVertexValues<K, VV, EV>());
-	}
-
-	@ForwardedFieldsFirst("f1->f2")
-	@ForwardedFieldsSecond("f0; f1; f2->f3")
-	private static final class ProjectEdgeWithSrcValue<K, VV, EV> implements
-			FlatJoinFunction<Vertex<K, VV>, Edge<K, EV>, Tuple4<K, K, VV, EV>> {
-
-		@Override
-		public void join(Vertex<K, VV> vertex, Edge<K, EV> edge, Collector<Tuple4<K, K, VV, EV>> collector)
-				throws Exception {
-
-			collector.collect(new Tuple4<K, K, VV, EV>(edge.getSource(), edge.getTarget(), vertex.getValue(),
-					edge.getValue()));
-		}
-	}
-
-	@ForwardedFieldsFirst("f0; f1; f2; f3->f4")
-	@ForwardedFieldsSecond("f1->f3")
-	private static final class ProjectEdgeWithVertexValues<K, VV, EV> implements
-			FlatJoinFunction<Tuple4<K, K, VV, EV>, Vertex<K, VV>, Triplet<K, VV, EV>> {
-
-		@Override
-		public void join(Tuple4<K, K, VV, EV> tripletWithSrcValSet,
-						Vertex<K, VV> vertex, Collector<Triplet<K, VV, EV>> collector) throws Exception {
-
-			collector.collect(new Triplet<K, VV, EV>(tripletWithSrcValSet.f0, tripletWithSrcValSet.f1,
-					tripletWithSrcValSet.f2, vertex.getValue(), tripletWithSrcValSet.f3));
-		}
-	}
-
-	/**
-	 * Apply a function to the attribute of each vertex in the graph.
-	 * 
-	 * @param mapper the map function to apply.
-	 * @return a new graph
-	 */
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper) {
-
-		TypeInformation<K> keyType = ((TupleTypeInfo<?>) vertices.getType()).getTypeAt(0);
-
-		TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null);
-
-		TypeInformation<Vertex<K, NV>> returnType = (TypeInformation<Vertex<K, NV>>) new TupleTypeInfo(
-				Vertex.class, keyType, valueType);
-
-		return mapVertices(mapper, returnType);
-	}
-
-	/**
-	 * Apply a function to the attribute of each vertex in the graph.
-	 *
-	 * @param mapper the map function to apply.
-	 * @param returnType the explicit return type.
-	 * @return a new graph
-	 */
-	public <NV> Graph<K, NV, EV> mapVertices(final MapFunction<Vertex<K, VV>, NV> mapper, TypeInformation<Vertex<K,NV>> returnType) {
-		DataSet<Vertex<K, NV>> mappedVertices = vertices.map(
-				new MapFunction<Vertex<K, VV>, Vertex<K, NV>>() {
-					public Vertex<K, NV> map(Vertex<K, VV> value) throws Exception {
-						return new Vertex<K, NV>(value.f0, mapper.map(value));
-					}
-				})
-				.returns(returnType)
-				.withForwardedFields("f0");
-
-		return new Graph<K, NV, EV>(mappedVertices, this.edges, this.context);
-	}
-
-	/**
-	 * Apply a function to the attribute of each edge in the graph.
-	 * 
-	 * @param mapper the map function to apply.
-	 * @return a new graph
-	 */
-	@SuppressWarnings({ "unchecked", "rawtypes" })
-	public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper) {
-
-		TypeInformation<K> keyType = ((TupleTypeInfo<?>) edges.getType()).getTypeAt(0);
-
-		TypeInformation<NV> valueType = TypeExtractor.createTypeInfo(MapFunction.class, mapper.getClass(), 1, null, null);
-
-		TypeInformation<Edge<K, NV>> returnType = (TypeInformation<Edge<K, NV>>) new TupleTypeInfo(
-				Edge.class, keyType, keyType, valueType);
-
-		return mapEdges(mapper, returnType);
-	}
-
-	/**
-	 * Apply a function to the attribute of each edge in the graph.
-	 *
-	 * @param mapper the map function to apply.
-	 * @param returnType the explicit return type.
-	 * @return a new graph
-	 */
-	public <NV> Graph<K, VV, NV> mapEdges(final MapFunction<Edge<K, EV>, NV> mapper, TypeInformation<Edge<K,NV>> returnType) {
-		DataSet<Edge<K, NV>> mappedEdges = edges.map(
-				new MapFunction<Edge<K, EV>, Edge<K, NV>>() {
-					public Edge<K, NV> map(Edge<K, EV> value) throws Exception {
-						return new Edge<K, NV>(value.f0, value.f1, mapper
-								.map(value));
-					}
-				})
-				.returns(returnType)
-				.withForwardedFields("f0; f1");
-
-		return new Graph<K, VV, NV>(this.vertices, mappedEdges, this.context);
-	}
-
-	/**
-	 * Joins the vertex DataSet of this graph with an input DataSet and applies
-	 * a UDF on the resulted values.
-	 * 
-	 * @param inputDataSet the DataSet to join with.
-	 * @param mapper the UDF map function to apply.
-	 * @return a new graph where the vertex values have been updated.
-	 */
-	public <T> Graph<K, VV, EV> joinWithVertices(DataSet<Tuple2<K, T>> inputDataSet, 
-			final MapFunction<Tuple2<VV, T>, VV> mapper) {
-
-		DataSet<Vertex<K, VV>> resultedVertices = this.getVertices()
-				.coGroup(inputDataSet).where(0).equalTo(0)
-				.with(new ApplyCoGroupToVertexValues<K, VV, T>(mapper));
-		return new Graph<K, VV, EV>(resultedVertices, this.edges, this.context);
-	}
-
-	private static final class ApplyCoGroupToVertexValues<K, VV, T>
-			implements CoGroupFunction<Vertex<K, VV>, Tuple2<K, T>, Vertex<K, VV>> {
-
-		private MapFunction<Tuple2<VV, T>, VV> mapper;
-
-		public ApplyCoGroupToVertexValues(MapFunction<Tuple2<VV, T>, VV> mapper) {
-			this.mapper = mapper;
-		}
-
-		@Override
-		public void coGroup(Iterable<Vertex<K, VV>> vertices,
-				Iterable<Tuple2<K, T>> input, Collector<Vertex<K, VV>> collector) throws Exception {
-
-			final Iterator<Vertex<K, VV>> vertexIterator = vertices.iterator();
-			final Iterator<Tuple2<K, T>> inputIterator = input.iterator();
-
-			if (vertexIterator.hasNext()) {
-				if (inputIterator.hasNext()) {
-					final Tuple2<K, T> inputNext = inputIterator.next();
-
-					collector.collect(new Vertex<K, VV>(inputNext.f0, mapper
-							.map(new Tuple2<VV, T>(vertexIterator.next().f1,
-									inputNext.f1))));
-				} else {
-					collector.collect(vertexIterator.next());
-				}
-
-			}
-		}
-	}
-
-	/**
-	 * Joins the edge DataSet with an input DataSet on a composite key of both
-	 * source and target and applies a UDF on the resulted values.
-	 * 
-	 * @param inputDataSet the DataSet to join with.
-	 * @param mapper the UDF map function to apply.
-	 * @param <T> the return type
-	 * @return a new graph where the edge values have been updated.
-	 */
-	public <T> Graph<K, VV, EV> joinWithEdges(DataSet<Tuple3<K, K, T>> inputDataSet,
-			final MapFunction<Tuple2<EV, T>, EV> mapper) {
-
-		DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
-				.coGroup(inputDataSet).where(0, 1).equalTo(0, 1)
-				.with(new ApplyCoGroupToEdgeValues<K, EV, T>(mapper));
-		return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
-	}
-
-	private static final class ApplyCoGroupToEdgeValues<K, EV, T>
-			implements CoGroupFunction<Edge<K, EV>, Tuple3<K, K, T>, Edge<K, EV>> {
-
-		private MapFunction<Tuple2<EV, T>, EV> mapper;
-
-		public ApplyCoGroupToEdgeValues(MapFunction<Tuple2<EV, T>, EV> mapper) {
-			this.mapper = mapper;
-		}
-
-		@Override
-		public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Tuple3<K, K, T>> input,
-				Collector<Edge<K, EV>> collector) throws Exception {
-
-			final Iterator<Edge<K, EV>> edgesIterator = edges.iterator();
-			final Iterator<Tuple3<K, K, T>> inputIterator = input.iterator();
-
-			if (edgesIterator.hasNext()) {
-				if (inputIterator.hasNext()) {
-					final Tuple3<K, K, T> inputNext = inputIterator.next();
-
-					collector.collect(new Edge<K, EV>(inputNext.f0,
-							inputNext.f1, mapper.map(new Tuple2<EV, T>(
-									edgesIterator.next().f2, inputNext.f2))));
-				} else {
-					collector.collect(edgesIterator.next());
-				}
-			}
-		}
-	}
-
-	/**
-	 * Joins the edge DataSet with an input DataSet on the source key of the
-	 * edges and the first attribute of the input DataSet and applies a UDF on
-	 * the resulted values. In case the inputDataSet contains the same key more
-	 * than once, only the first value will be considered.
-	 * 
-	 * @param inputDataSet the DataSet to join with.
-	 * @param mapper the UDF map function to apply.
-	 * @param <T> the return type
-	 * @return a new graph where the edge values have been updated.
-	 */
-	public <T> Graph<K, VV, EV> joinWithEdgesOnSource(DataSet<Tuple2<K, T>> inputDataSet,
-			final MapFunction<Tuple2<EV, T>, EV> mapper) {
-
-		DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
-				.coGroup(inputDataSet).where(0).equalTo(0)
-				.with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
-
-		return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
-	}
-
-	private static final class ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>
-			implements CoGroupFunction<Edge<K, EV>, Tuple2<K, T>, Edge<K, EV>> {
-
-		private MapFunction<Tuple2<EV, T>, EV> mapper;
-
-		public ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget(
-				MapFunction<Tuple2<EV, T>, EV> mapper) {
-			this.mapper = mapper;
-		}
-
-		@Override
-		public void coGroup(Iterable<Edge<K, EV>> edges,
-				Iterable<Tuple2<K, T>> input, Collector<Edge<K, EV>> collector) throws Exception {
-
-			final Iterator<Edge<K, EV>> edgesIterator = edges.iterator();
-			final Iterator<Tuple2<K, T>> inputIterator = input.iterator();
-
-			if (inputIterator.hasNext()) {
-				final Tuple2<K, T> inputNext = inputIterator.next();
-
-				while (edgesIterator.hasNext()) {
-					Edge<K, EV> edgesNext = edgesIterator.next();
-
-					collector.collect(new Edge<K, EV>(edgesNext.f0,
-							edgesNext.f1, mapper.map(new Tuple2<EV, T>(
-									edgesNext.f2, inputNext.f1))));
-				}
-
-			} else {
-				while (edgesIterator.hasNext()) {
-					collector.collect(edgesIterator.next());
-				}
-			}
-		}
-	}
-
-	/**
-	 * Joins the edge DataSet with an input DataSet on the target key of the
-	 * edges and the first attribute of the input DataSet and applies a UDF on
-	 * the resulted values. Should the inputDataSet contain the same key more
-	 * than once, only the first value will be considered.
-	 * 
-	 * @param inputDataSet the DataSet to join with.
-	 * @param mapper the UDF map function to apply.
-	 * @param <T> the return type
-	 * @return a new graph where the edge values have been updated.
-	 */
-	public <T> Graph<K, VV, EV> joinWithEdgesOnTarget(DataSet<Tuple2<K, T>> inputDataSet,
-			final MapFunction<Tuple2<EV, T>, EV> mapper) {
-
-		DataSet<Edge<K, EV>> resultedEdges = this.getEdges()
-				.coGroup(inputDataSet).where(1).equalTo(0)
-				.with(new ApplyCoGroupToEdgeValuesOnEitherSourceOrTarget<K, EV, T>(mapper));
-
-		return new Graph<K, VV, EV>(this.vertices, resultedEdges, this.context);
-	}
-
-	/**
-	 * Apply filtering functions to the graph and return a sub-graph that
-	 * satisfies the predicates for both vertices and edges.
-	 * 
-	 * @param vertexFilter the filter function for vertices.
-	 * @param edgeFilter the filter function for edges.
-	 * @return the resulting sub-graph.
-	 */
-	public Graph<K, VV, EV> subgraph(FilterFunction<Vertex<K, VV>> vertexFilter, FilterFunction<Edge<K, EV>> edgeFilter) {
-
-		DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(vertexFilter);
-
-		DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices)
-				.where(0).equalTo(0).with(new ProjectEdge<K, VV, EV>())
-				.join(filteredVertices).where(1).equalTo(0)
-				.with(new ProjectEdge<K, VV, EV>());
-
-		DataSet<Edge<K, EV>> filteredEdges = remainingEdges.filter(edgeFilter);
-
-		return new Graph<K, VV, EV>(filteredVertices, filteredEdges,
-				this.context);
-	}
-
-	/**
-	 * Apply a filtering function to the graph and return a sub-graph that
-	 * satisfies the predicates only for the vertices.
-	 * 
-	 * @param vertexFilter the filter function for vertices.
-	 * @return the resulting sub-graph.
-	 */
-	public Graph<K, VV, EV> filterOnVertices(FilterFunction<Vertex<K, VV>> vertexFilter) {
-
-		DataSet<Vertex<K, VV>> filteredVertices = this.vertices.filter(vertexFilter);
-
-		DataSet<Edge<K, EV>> remainingEdges = this.edges.join(filteredVertices)
-				.where(0).equalTo(0).with(new ProjectEdge<K, VV, EV>())
-				.join(filteredVertices).where(1).equalTo(0)
-				.with(new ProjectEdge<K, VV, EV>());
-
-		return new Graph<K, VV, EV>(filteredVertices, remainingEdges, this.context);
-	}
-
-	/**
-	 * Apply a filtering function to the graph and return a sub-graph that
-	 * satisfies the predicates only for the edges.
-	 * 
-	 * @param edgeFilter the filter function for edges.
-	 * @return the resulting sub-graph.
-	 */
-	public Graph<K, VV, EV> filterOnEdges(FilterFunction<Edge<K, EV>> edgeFilter) {
-		DataSet<Edge<K, EV>> filteredEdges = this.edges.filter(edgeFilter);
-
-		return new Graph<K, VV, EV>(this.vertices, filteredEdges, this.context);
-	}
-
-	@ForwardedFieldsFirst("f0; f1; f2")
-	private static final class ProjectEdge<K, VV, EV> implements FlatJoinFunction<
-		Edge<K, EV>, Vertex<K, VV>, Edge<K, EV>> {
-		public void join(Edge<K, EV> first, Vertex<K, VV> second, Collector<Edge<K, EV>> out) {
-			out.collect(first);
-		}
-	}
-
-	/**
-	 * Return the out-degree of all vertices in the graph
-	 * 
-	 * @return A DataSet of Tuple2<vertexId, outDegree>
-	 */
-	public DataSet<Tuple2<K, Long>> outDegrees() {
-
-		return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup<K, VV, EV>());
-	}
-
-	private static final class CountNeighborsCoGroup<K, VV, EV>
-			implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Long>> {
-		@SuppressWarnings("unused")
-		public void coGroup(Iterable<Vertex<K, VV>> vertex,	Iterable<Edge<K, EV>> outEdges,
-				Collector<Tuple2<K, Long>> out) {
-			long count = 0;
-			for (Edge<K, EV> edge : outEdges) {
-				count++;
-			}
-
-			Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
-
-			if(vertexIterator.hasNext()) {
-				out.collect(new Tuple2<K, Long>(vertexIterator.next().f0, count));
-			} else {
-				throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
-			}
-		}
-	}
-
-	/**
-	 * Return the in-degree of all vertices in the graph
-	 * 
-	 * @return A DataSet of Tuple2<vertexId, inDegree>
-	 */
-	public DataSet<Tuple2<K, Long>> inDegrees() {
-
-		return vertices.coGroup(edges).where(0).equalTo(1).with(new CountNeighborsCoGroup<K, VV, EV>());
-	}
-
-	/**
-	 * Return the degree of all vertices in the graph
-	 * 
-	 * @return A DataSet of Tuple2<vertexId, degree>
-	 */
-	public DataSet<Tuple2<K, Long>> getDegrees() {
-		return outDegrees().union(inDegrees()).groupBy(0).sum(1);
-	}
-
-	/**
-	 * This operation adds all inverse-direction edges to the graph.
-	 * 
-	 * @return the undirected graph.
-	 */
-	public Graph<K, VV, EV> getUndirected() {
-
-		DataSet<Edge<K, EV>> undirectedEdges = edges.flatMap(new RegularAndReversedEdgesMap<K, EV>());
-		return new Graph<K, VV, EV>(vertices, undirectedEdges, this.context);
-	}
-
-	/**
-	 * Compute an aggregate over the edges of each vertex. The function applied
-	 * on the edges has access to the vertex value.
-	 * 
-	 * @param edgesFunction
-	 *            the function to apply to the neighborhood
-	 * @param direction
-	 *            the edge direction (in-, out-, all-)
-	 * @param <T>
-	 *            the output type
-	 * @return a dataset of a T
-	 * @throws IllegalArgumentException
-	 */
-	public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
-											EdgeDirection direction) throws IllegalArgumentException {
-
-		switch (direction) {
-		case IN:
-			return vertices.coGroup(edges).where(0).equalTo(1)
-					.with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction));
-		case OUT:
-			return vertices.coGroup(edges).where(0).equalTo(0)
-					.with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction));
-		case ALL:
-			return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()))
-					.where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction));
-		default:
-			throw new IllegalArgumentException("Illegal edge direction");
-		}
-	}
-
-	/**
-	 * Compute an aggregate over the edges of each vertex. The function applied
-	 * on the edges has access to the vertex value.
-	 *
-	 * @param edgesFunction
-	 *            the function to apply to the neighborhood
-	 * @param direction
-	 *            the edge direction (in-, out-, all-)
-	 * @param <T>
-	 *            the output type
-	 * @param typeInfo the explicit return type.
-	 * @return a dataset of a T
-	 * @throws IllegalArgumentException
-	 */
-	public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
-											EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
-
-		switch (direction) {
-			case IN:
-				return vertices.coGroup(edges).where(0).equalTo(1)
-						.with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
-			case OUT:
-				return vertices.coGroup(edges).where(0).equalTo(0)
-						.with(new ApplyCoGroupFunction<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
-			case ALL:
-				return vertices.coGroup(edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>()))
-						.where(0).equalTo(0).with(new ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>(edgesFunction)).returns(typeInfo);
-			default:
-				throw new IllegalArgumentException("Illegal edge direction");
-		}
-	}
-
-	/**
-	 * Compute an aggregate over the edges of each vertex. The function applied
-	 * on the edges only has access to the vertex id (not the vertex value).
-	 * 
-	 * @param edgesFunction
-	 *            the function to apply to the neighborhood
-	 * @param direction
-	 *            the edge direction (in-, out-, all-)
-	 * @param <T>
-	 *            the output type
-	 * @return a dataset of T
-	 * @throws IllegalArgumentException
-	 */
-	public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
-											EdgeDirection direction) throws IllegalArgumentException {
-
-		switch (direction) {
-		case IN:
-			return edges.map(new ProjectVertexIdMap<K, EV>(1))
-					.withForwardedFields("f1->f0")
-					.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
-		case OUT:
-			return edges.map(new ProjectVertexIdMap<K, EV>(0))
-					.withForwardedFields("f0")
-					.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
-		case ALL:
-			return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
-					.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction));
-		default:
-			throw new IllegalArgumentException("Illegal edge direction");
-		}
-	}
-
-	/**
-	 * Compute an aggregate over the edges of each vertex. The function applied
-	 * on the edges only has access to the vertex id (not the vertex value).
-	 *
-	 * @param edgesFunction
-	 *            the function to apply to the neighborhood
-	 * @param direction
-	 *            the edge direction (in-, out-, all-)
-	 * @param <T>
-	 *            the output type
-	 * @param typeInfo the explicit return type.
-	 * @return a dataset of T
-	 * @throws IllegalArgumentException
-	 */
-	public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
-											EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
-
-		switch (direction) {
-			case IN:
-				return edges.map(new ProjectVertexIdMap<K, EV>(1))
-						.withForwardedFields("f1->f0")
-						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
-			case OUT:
-				return edges.map(new ProjectVertexIdMap<K, EV>(0))
-						.withForwardedFields("f0")
-						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
-			case ALL:
-				return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
-						.groupBy(0).reduceGroup(new ApplyGroupReduceFunction<K, EV, T>(edgesFunction)).returns(typeInfo);
-			default:
-				throw new IllegalArgumentException("Illegal edge direction");
-		}
-	}
-
-	private static final class ProjectVertexIdMap<K, EV> implements MapFunction<
-		Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
-
-		private int fieldPosition;
-
-		public ProjectVertexIdMap(int position) {
-			this.fieldPosition = position;
-		}
-
-		@SuppressWarnings("unchecked")
-		public Tuple2<K, Edge<K, EV>> map(Edge<K, EV> edge) {
-			return new Tuple2<K, Edge<K, EV>>((K) edge.getField(fieldPosition),	edge);
-		}
-	}
-
-	private static final class ProjectVertexWithEdgeValueMap<K, EV>	implements MapFunction<
-		Edge<K, EV>, Tuple2<K, EV>> {
-
-		private int fieldPosition;
-
-		public ProjectVertexWithEdgeValueMap(int position) {
-			this.fieldPosition = position;
-		}
-
-		@SuppressWarnings("unchecked")
-		public Tuple2<K, EV> map(Edge<K, EV> edge) {
-			return new Tuple2<K, EV>((K) edge.getField(fieldPosition),	edge.getValue());
-		}
-	}
-
-	private static final class ApplyGroupReduceFunction<K, EV, T> implements GroupReduceFunction<
-		Tuple2<K, Edge<K, EV>>, T>,	ResultTypeQueryable<T> {
-
-		private EdgesFunction<K, EV, T> function;
-
-		public ApplyGroupReduceFunction(EdgesFunction<K, EV, T> fun) {
-			this.function = fun;
-		}
-
-		public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<T> out) throws Exception {
-			function.iterateEdges(edges, out);
-		}
-
-		@Override
-		public TypeInformation<T> getProducedType() {
-			return TypeExtractor.createTypeInfo(EdgesFunction.class, function.getClass(), 2, null, null);
-		}
-	}
-
-	private static final class EmitOneEdgePerNode<K, VV, EV> implements FlatMapFunction<
-		Edge<K, EV>, Tuple2<K, Edge<K, EV>>> {
-
-		public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, Edge<K, EV>>> out) {
-			out.collect(new Tuple2<K, Edge<K, EV>>(edge.getSource(), edge));
-			out.collect(new Tuple2<K, Edge<K, EV>>(edge.getTarget(), edge));
-		}
-	}
-
-	private static final class EmitOneVertexWithEdgeValuePerNode<K, EV>	implements FlatMapFunction<
-		Edge<K, EV>, Tuple2<K, EV>> {
-
-		public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, EV>> out) {
-			out.collect(new Tuple2<K, EV>(edge.getSource(), edge.getValue()));
-			out.collect(new Tuple2<K, EV>(edge.getTarget(), edge.getValue()));
-		}
-	}
-
-	private static final class EmitOneEdgeWithNeighborPerNode<K, EV> implements FlatMapFunction<
-		Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> {
-
-		public void flatMap(Edge<K, EV> edge, Collector<Tuple3<K, K, Edge<K, EV>>> out) {
-			out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getSource(), edge.getTarget(), edge));
-			out.collect(new Tuple3<K, K, Edge<K, EV>>(edge.getTarget(), edge.getSource(), edge));
-		}
-	}
-
-	private static final class ApplyCoGroupFunction<K, VV, EV, T> implements CoGroupFunction<
-		Vertex<K, VV>, Edge<K, EV>, T>, ResultTypeQueryable<T> {
-
-		private EdgesFunctionWithVertexValue<K, VV, EV, T> function;
-
-		public ApplyCoGroupFunction(EdgesFunctionWithVertexValue<K, VV, EV, T> fun) {
-			this.function = fun;
-		}
-
-		public void coGroup(Iterable<Vertex<K, VV>> vertex,
-				Iterable<Edge<K, EV>> edges, Collector<T> out) throws Exception {
-
-			Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
-
-			if(vertexIterator.hasNext()) {
-				function.iterateEdges(vertexIterator.next(), edges, out);
-			} else {
-				throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
-			}
-		}
-
-		@Override
-		public TypeInformation<T> getProducedType() {
-			return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(), 3,
-					null, null);
-		}
-	}
-
-	private static final class ApplyCoGroupFunctionOnAllEdges<K, VV, EV, T>
-			implements	CoGroupFunction<Vertex<K, VV>, Tuple2<K, Edge<K, EV>>, T>, ResultTypeQueryable<T> {
-
-		private EdgesFunctionWithVertexValue<K, VV, EV, T> function;
-
-		public ApplyCoGroupFunctionOnAllEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> fun) {
-			this.function = fun;
-		}
-
-		public void coGroup(Iterable<Vertex<K, VV>> vertex,	final Iterable<Tuple2<K, Edge<K, EV>>> keysWithEdges,
-				Collector<T> out) throws Exception {
-
-			final Iterator<Edge<K, EV>> edgesIterator = new Iterator<Edge<K, EV>>() {
-
-				final Iterator<Tuple2<K, Edge<K, EV>>> keysWithEdgesIterator = keysWithEdges.iterator();
-
-				@Override
-				public boolean hasNext() {
-					return keysWithEdgesIterator.hasNext();
-				}
-
-				@Override
-				public Edge<K, EV> next() {
-					return keysWithEdgesIterator.next().f1;
-				}
-
-				@Override
-				public void remove() {
-					keysWithEdgesIterator.remove();
-				}
-			};
-
-			Iterable<Edge<K, EV>> edgesIterable = new Iterable<Edge<K, EV>>() {
-				public Iterator<Edge<K, EV>> iterator() {
-					return edgesIterator;
-				}
-			};
-
-			Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
-
-			if(vertexIterator.hasNext()) {
-				function.iterateEdges(vertexIterator.next(), edgesIterable, out);
-			} else {
-				throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
-			}
-		}
-
-		@Override
-		public TypeInformation<T> getProducedType() {
-			return TypeExtractor.createTypeInfo(EdgesFunctionWithVertexValue.class, function.getClass(), 3,
-					null, null);
-		}
-	}
-
-	@ForwardedFields("f0->f1; f1->f0; f2")
-	private static final class ReverseEdgesMap<K, EV>
-			implements MapFunction<Edge<K, EV>, Edge<K, EV>> {
-
-		public Edge<K, EV> map(Edge<K, EV> value) {
-			return new Edge<K, EV>(value.f1, value.f0, value.f2);
-		}
-	}
-
-	private static final class RegularAndReversedEdgesMap<K, EV>
-			implements FlatMapFunction<Edge<K, EV>, Edge<K, EV>> {
-
-		@Override
-		public void flatMap(Edge<K, EV> edge, Collector<Edge<K, EV>> out) throws Exception {
-			out.collect(new Edge<K, EV>(edge.f0, edge.f1, edge.f2));
-			out.collect(new Edge<K, EV>(edge.f1, edge.f0, edge.f2));
-		}
-	}
-
-	/**
-	 * Reverse the direction of the edges in the graph
-	 * 
-	 * @return a new graph with all edges reversed
-	 * @throws UnsupportedOperationException
-	 */
-	public Graph<K, VV, EV> reverse() throws UnsupportedOperationException {
-		DataSet<Edge<K, EV>> reversedEdges = edges.map(new ReverseEdgesMap<K, EV>());
-		return new Graph<K, VV, EV>(vertices, reversedEdges, this.context);
-	}
-
-	/**
-	 * @return a long integer representing the number of vertices
-	 */
-	public long numberOfVertices() throws Exception {
-		return vertices.count();
-	}
-
-	/**
-	 * @return a long integer representing the number of edges
-	 */
-	public long numberOfEdges() throws Exception {
-		return edges.count();
-	}
-
-	/**
-	 * @return The IDs of the vertices as DataSet
-	 */
-	public DataSet<K> getVertexIds() {
-		return vertices.map(new ExtractVertexIDMapper<K, VV>());
-	}
-
-	private static final class ExtractVertexIDMapper<K, VV>
-			implements MapFunction<Vertex<K, VV>, K> {
-		@Override
-		public K map(Vertex<K, VV> vertex) {
-			return vertex.f0;
-		}
-	}
-
-	/**
-	 * @return The IDs of the edges as DataSet
-	 */
-	public DataSet<Tuple2<K, K>> getEdgeIds() {
-		return edges.map(new ExtractEdgeIDsMapper<K, EV>());
-	}
-
-	@ForwardedFields("f0; f1")
-	private static final class ExtractEdgeIDsMapper<K, EV>
-			implements MapFunction<Edge<K, EV>, Tuple2<K, K>> {
-		@Override
-		public Tuple2<K, K> map(Edge<K, EV> edge) throws Exception {
-			return new Tuple2<K, K>(edge.f0, edge.f1);
-		}
-	}
-
-	/**
-	 * Adds the input vertex to the graph. If the vertex already
-	 * exists in the graph, it will not be added again.
-	 * 
-	 * @param vertex the vertex to be added
-	 * @return the new graph containing the existing vertices as well as the one just added
-	 */
-	public Graph<K, VV, EV> addVertex(final Vertex<K, VV> vertex) {
-		List<Vertex<K, VV>> newVertex = new ArrayList<Vertex<K, VV>>();
-		newVertex.add(vertex);
-
-		return addVertices(newVertex);
-	}
-
-	/**
-	 * Adds the list of vertices, passed as input, to the graph.
-	 * If the vertices already exist in the graph, they will not be added once more.
-	 *
-	 * @param verticesToAdd the list of vertices to add
-	 * @return the new graph containing the existing and newly added vertices
-	 */
-	public Graph<K, VV, EV> addVertices(List<Vertex<K, VV>> verticesToAdd) {
-		// Add the vertices
-		DataSet<Vertex<K, VV>> newVertices = this.vertices.union(this.context.fromCollection(verticesToAdd)).distinct();
-
-		return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
-	}
-
-	/**
-	 * Adds the given edge to the graph. If the source and target vertices do
-	 * not exist in the graph, they will also be added.
-	 * 
-	 * @param source the source vertex of the edge
-	 * @param target the target vertex of the edge
-	 * @param edgeValue the edge value
-	 * @return the new graph containing the existing vertices and edges plus the
-	 *         newly added edge
-	 */
-	public Graph<K, VV, EV> addEdge(Vertex<K, VV> source, Vertex<K, VV> target, EV edgeValue) {
-		Graph<K, VV, EV> partialGraph = fromCollection(Arrays.asList(source, target),
-				Arrays.asList(new Edge<K, EV>(source.f0, target.f0, edgeValue)),
-				this.context);
-		return this.union(partialGraph);
-	}
-
-	/**
-	 * Adds the given list edges to the graph.
-	 *
-	 * When adding an edge for a non-existing set of vertices, the edge is considered invalid and ignored.
-	 *
-	 * @param newEdges the data set of edges to be added
-	 * @return a new graph containing the existing edges plus the newly added edges.
-	 */
-	public Graph<K, VV, EV> addEdges(List<Edge<K, EV>> newEdges) {
-
-		DataSet<Edge<K,EV>> newEdgesDataSet = this.context.fromCollection(newEdges);
-
-		DataSet<Edge<K,EV>> validNewEdges = this.getVertices().join(newEdgesDataSet)
-				.where(0).equalTo(0)
-				.with(new JoinVerticesWithEdgesOnSrc<K, VV, EV>())
-				.join(this.getVertices()).where(1).equalTo(0)
-				.with(new JoinWithVerticesOnTrg<K, VV, EV>());
-
-		return Graph.fromDataSet(this.vertices, this.edges.union(validNewEdges), this.context);
-	}
-
-	@ForwardedFieldsSecond("f0; f1; f2")
-	private static final class JoinVerticesWithEdgesOnSrc<K, VV, EV> implements
-			JoinFunction<Vertex<K, VV>, Edge<K, EV>, Edge<K, EV>> {
-
-		@Override
-		public Edge<K, EV> join(Vertex<K, VV> vertex, Edge<K, EV> edge) throws Exception {
-			return edge;
-		}
-	}
-
-	@ForwardedFieldsFirst("f0; f1; f2")
-	private static final class JoinWithVerticesOnTrg<K, VV, EV> implements
-			JoinFunction<Edge<K, EV>, Vertex<K, VV>, Edge<K, EV>> {
-
-		@Override
-		public Edge<K, EV> join(Edge<K, EV> edge, Vertex<K, VV> vertex) throws Exception {
-			return edge;
-		}
-	}
-
-	/**
-	 * Removes the given vertex and its edges from the graph.
-	 * 
-	 * @param vertex the vertex to remove
-	 * @return the new graph containing the existing vertices and edges without
-	 *         the removed vertex and its edges
-	 */
-	public Graph<K, VV, EV> removeVertex(Vertex<K, VV> vertex) {
-
-		List<Vertex<K, VV>> vertexToBeRemoved = new ArrayList<Vertex<K, VV>>();
-		vertexToBeRemoved.add(vertex);
-
-		return removeVertices(vertexToBeRemoved);
-	}
-	/**
-	 * Removes the given list of vertices and its edges from the graph.
-	 *
-	 * @param verticesToBeRemoved the list of vertices to be removed
-	 * @return the resulted graph containing the initial vertices and edges minus the vertices
-	 * 		   and edges removed.
-	 */
-
-	public Graph<K, VV, EV> removeVertices(List<Vertex<K, VV>> verticesToBeRemoved)
-	{
-		return removeVertices(this.context.fromCollection(verticesToBeRemoved));
-	}
-
-	/**
-	 * Removes the given list of vertices and its edges from the graph.
-	 *
-	 * @param verticesToBeRemoved the DataSet of vertices to be removed
-	 * @return the resulted graph containing the initial vertices and edges minus the vertices
-	 * 		   and edges removed.
-	 */
-	private Graph<K, VV, EV> removeVertices(DataSet<Vertex<K, VV>> verticesToBeRemoved) {
-
-		DataSet<Vertex<K, VV>> newVertices = getVertices().coGroup(verticesToBeRemoved).where(0).equalTo(0)
-				.with(new VerticesRemovalCoGroup<K, VV>());
-
-		DataSet < Edge < K, EV >> newEdges = newVertices.join(getEdges()).where(0).equalTo(0)
-				// if the edge source was removed, the edge will also be removed
-				.with(new ProjectEdgeToBeRemoved<K, VV, EV>())
-				// if the edge target was removed, the edge will also be removed
-				.join(newVertices).where(1).equalTo(0)
-				.with(new ProjectEdge<K, VV, EV>());
-
-		return new Graph<K, VV, EV>(newVertices, newEdges, context);
-	}
-
-	private static final class VerticesRemovalCoGroup<K, VV> implements CoGroupFunction<Vertex<K, VV>, Vertex<K, VV>, Vertex<K, VV>> {
-
-		@Override
-		public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Vertex<K, VV>> vertexToBeRemoved,
-							Collector<Vertex<K, VV>> out) throws Exception {
-
-			final Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
-			final Iterator<Vertex<K, VV>> vertexToBeRemovedIterator = vertexToBeRemoved.iterator();
-			Vertex<K, VV> next;
-
-			if (vertexIterator.hasNext()) {
-				if (!vertexToBeRemovedIterator.hasNext()) {
-					next = vertexIterator.next();
-					out.collect(next);
-				}
-			}
-		}
-	}
-
-
-
-	@ForwardedFieldsSecond("f0; f1; f2")
-	private static final class ProjectEdgeToBeRemoved<K,VV,EV> implements JoinFunction<Vertex<K, VV>, Edge<K, EV>, Edge<K, EV>> {
-		@Override
-		public Edge<K, EV> join(Vertex<K, VV> vertex, Edge<K, EV> edge) throws Exception {
-			return edge;
-		}
-	}
-
-	 /**
-	 * Removes all edges that match the given edge from the graph.
-	 * 
-	 * @param edge the edge to remove
-	 * @return the new graph containing the existing vertices and edges without
-	 *         the removed edges
-	 */
-	public Graph<K, VV, EV> removeEdge(Edge<K, EV> edge) {
-		DataSet<Edge<K, EV>> newEdges = getEdges().filter(new EdgeRemovalEdgeFilter<K, EV>(edge));
-		return new Graph<K, VV, EV>(this.vertices, newEdges, this.context);
-	}
-
-	private static final class EdgeRemovalEdgeFilter<K, EV>
-			implements FilterFunction<Edge<K, EV>> {
-		private Edge<K, EV> edgeToRemove;
-
-		public EdgeRemovalEdgeFilter(Edge<K, EV> edge) {
-			edgeToRemove = edge;
-		}
-
-		@Override
-		public boolean filter(Edge<K, EV> edge) {
-			return (!(edge.f0.equals(edgeToRemove.f0) && edge.f1
-					.equals(edgeToRemove.f1)));
-		}
-	}
-
-	/**
-	 * Removes all the edges that match the edges in the given data set from the graph.
-	 *
-	 * @param edgesToBeRemoved the list of edges to be removed
-	 * @return a new graph where the edges have been removed and in which the vertices remained intact
-	 */
-	public Graph<K, VV, EV> removeEdges(List<Edge<K, EV>> edgesToBeRemoved) {
-
-		DataSet<Edge<K, EV>> newEdges = getEdges().coGroup(this.context.fromCollection(edgesToBeRemoved))
-				.where(0,1).equalTo(0,1).with(new EdgeRemovalCoGroup<K, EV>());
-
-		return new Graph<K, VV, EV>(this.vertices, newEdges, context);
-	}
-
-	private static final class EdgeRemovalCoGroup<K,EV> implements CoGroupFunction<Edge<K, EV>, Edge<K, EV>, Edge<K, EV>> {
-
-		@Override
-		public void coGroup(Iterable<Edge<K, EV>> edge, Iterable<Edge<K, EV>> edgeToBeRemoved,
-							Collector<Edge<K, EV>> out) throws Exception {
-
-			final Iterator<Edge<K, EV>> edgeIterator = edge.iterator();
-			final Iterator<Edge<K, EV>> edgeToBeRemovedIterator = edgeToBeRemoved.iterator();
-			Edge<K, EV> next;
-
-			if (edgeIterator.hasNext()) {
-				if (!edgeToBeRemovedIterator.hasNext()) {
-					next = edgeIterator.next();
-					out.collect(next);
-				}
-			}
-		}
-	}
-
-	/**
-	 * Performs union on the vertices and edges sets of the input graphs
-	 * removing duplicate vertices but maintaining duplicate edges.
-	 * 
-	 * @param graph the graph to perform union with
-	 * @return a new graph
-	 */
-	public Graph<K, VV, EV> union(Graph<K, VV, EV> graph) {
-
-		DataSet<Vertex<K, VV>> unionedVertices = graph.getVertices().union(this.getVertices()).distinct();
-		DataSet<Edge<K, EV>> unionedEdges = graph.getEdges().union(this.getEdges());
-		return new Graph<K, VV, EV>(unionedVertices, unionedEdges, this.context);
-	}
-
-	/**
-	 * Performs Difference on the vertex and edge sets of the input graphs
-	 * removes common vertices and edges. If a source/target vertex is removed, its corresponding edge will also be removed
-	 * @param graph the graph to perform difference with
-	 * @return a new graph where the common vertices and edges have been removed
-	 */
-	public Graph<K,VV,EV> difference(Graph<K,VV,EV> graph) {
-		DataSet<Vertex<K,VV>> removeVerticesData = graph.getVertices();
-		return this.removeVertices(removeVerticesData);
-	}
-
-	/**
-	 * Runs a Vertex-Centric iteration on the graph.
-	 * No configuration options are provided.
-	 *
-	 * @param vertexUpdateFunction the vertex update function
-	 * @param messagingFunction the messaging function
-	 * @param maximumNumberOfIterations maximum number of iterations to perform
-	 * 
-	 * @return the updated Graph after the vertex-centric iteration has converged or
-	 * after maximumNumberOfIterations.
-	 */
-	public <M> Graph<K, VV, EV> runVertexCentricIteration(
-			VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
-			MessagingFunction<K, VV, M, EV> messagingFunction,
-			int maximumNumberOfIterations) {
-
-		return this.runVertexCentricIteration(vertexUpdateFunction, messagingFunction,
-				maximumNumberOfIterations, null);
-	}
-
-	/**
-	 * Runs a Vertex-Centric iteration on the graph with configuration options.
-	 * 
-	 * @param vertexUpdateFunction the vertex update function
-	 * @param messagingFunction the messaging function
-	 * @param maximumNumberOfIterations maximum number of iterations to perform
-	 * @param parameters the iteration configuration parameters
-	 * 
-	 * @return the updated Graph after the vertex-centric iteration has converged or
-	 * after maximumNumberOfIterations.
-	 */
-	public <M> Graph<K, VV, EV> runVertexCentricIteration(
-			VertexUpdateFunction<K, VV, M> vertexUpdateFunction,
-			MessagingFunction<K, VV, M, EV> messagingFunction,
-			int maximumNumberOfIterations, VertexCentricConfiguration parameters) {
-
-		VertexCentricIteration<K, VV, M, EV> iteration = VertexCentricIteration.withEdges(
-				edges, vertexUpdateFunction, messagingFunction, maximumNumberOfIterations);
-
-		iteration.configure(parameters);
-
-		DataSet<Vertex<K, VV>> newVertices = this.getVertices().runOperation(iteration);
-
-		return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
-	}
-
-	/**
-	 * Runs a Gather-Sum-Apply iteration on the graph.
-	 * No configuration options are provided.
-	 *
-	 * @param gatherFunction the gather function collects information about adjacent vertices and edges
-	 * @param sumFunction the sum function aggregates the gathered information
-	 * @param applyFunction the apply function updates the vertex values with the aggregates
-	 * @param maximumNumberOfIterations maximum number of iterations to perform
-	 * @param <M> the intermediate type used between gather, sum and apply
-	 *
-	 * @return the updated Graph after the gather-sum-apply iteration has converged or
-	 * after maximumNumberOfIterations.
-	 */
-	public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
-			GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
-			ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations) {
-
-		return this.runGatherSumApplyIteration(gatherFunction, sumFunction, applyFunction,
-				maximumNumberOfIterations, null);
-	}
-
-	/**
-	 * Runs a Gather-Sum-Apply iteration on the graph with configuration options.
-	 *
-	 * @param gatherFunction the gather function collects information about adjacent vertices and edges
-	 * @param sumFunction the sum function aggregates the gathered information
-	 * @param applyFunction the apply function updates the vertex values with the aggregates
-	 * @param maximumNumberOfIterations maximum number of iterations to perform
-	 * @param parameters the iteration configuration parameters
-	 * @param <M> the intermediate type used between gather, sum and apply
-	 *
-	 * @return the updated Graph after the gather-sum-apply iteration has converged or
-	 * after maximumNumberOfIterations.
-	 */
-	public <M> Graph<K, VV, EV> runGatherSumApplyIteration(
-			GatherFunction<VV, EV, M> gatherFunction, SumFunction<VV, EV, M> sumFunction,
-			ApplyFunction<K, VV, M> applyFunction, int maximumNumberOfIterations,
-			GSAConfiguration parameters) {
-
-		GatherSumApplyIteration<K, VV, EV, M> iteration = GatherSumApplyIteration.withEdges(
-				edges, gatherFunction, sumFunction, applyFunction, maximumNumberOfIterations);
-
-		iteration.configure(parameters);
-
-		DataSet<Vertex<K, VV>> newVertices = vertices.runOperation(iteration);
-
-		return new Graph<K, VV, EV>(newVertices, this.edges, this.context);
-	}
-
-	/**
-	 * @param algorithm the algorithm to run on the Graph
-	 * @param <T> the return type
-	 * @return the result of the graph algorithm
-	 * @throws Exception
-	 */
-	public <T> T run(GraphAlgorithm<K, VV, EV, T> algorithm) throws Exception {
-		return algorithm.run(this);
-	}
-
-	/**
-	 * Compute an aggregate over the neighbors (edges and vertices) of each
-	 * vertex. The function applied on the neighbors has access to the vertex
-	 * value.
-	 * 
-	 * @param neighborsFunction the function to apply to the neighborhood
-	 * @param direction the edge direction (in-, out-, all-)
-	 * @param <T> the output type
-	 * @return a dataset of a T
-	 * @throws IllegalArgumentException
-	 */
-	public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
-												EdgeDirection direction) throws IllegalArgumentException {
-		switch (direction) {
-		case IN:
-			// create <edge-sourceVertex> pairs
-			DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
-					.join(this.vertices).where(0).equalTo(0);
-			return vertices.coGroup(edgesWithSources)
-					.where(0).equalTo("f0.f1")
-					.with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction));
-		case OUT:
-			// create <edge-targetVertex> pairs
-			DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
-					.join(this.vertices).where(1).equalTo(0);
-			return vertices.coGroup(edgesWithTargets)
-					.where(0).equalTo("f0.f0")
-					.with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction));
-		case ALL:
-			// create <edge-sourceOrTargetVertex> pairs
-			DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
-					.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
-					.join(this.vertices).where(1).equalTo(0)
-					.with(new ProjectEdgeWithNeighbor<K, VV, EV>());
-
-			return vertices.coGroup(edgesWithNeighbors)
-					.where(0).equalTo(0)
-					.with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction));
-		default:
-			throw new IllegalArgumentException("Illegal edge direction");
-		}
-	}
-
-	/**
-	 * Compute an aggregate over the neighbors (edges and vertices) of each
-	 * vertex. The function applied on the neighbors has access to the vertex
-	 * value.
-	 *
-	 * @param neighborsFunction the function to apply to the neighborhood
-	 * @param direction the edge direction (in-, out-, all-)
-	 * @param <T> the output type
-	 * @param typeInfo the explicit return type.
-	 * @return a dataset of a T
-	 * @throws IllegalArgumentException
-	 */
-	public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
-												EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
-		switch (direction) {
-			case IN:
-				// create <edge-sourceVertex> pairs
-				DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
-						.join(this.vertices).where(0).equalTo(0);
-				return vertices.coGroup(edgesWithSources)
-						.where(0).equalTo("f0.f1")
-						.with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
-			case OUT:
-				// create <edge-targetVertex> pairs
-				DataSet<Tuple2<Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
-						.join(this.vertices).where(1).equalTo(0);
-				return vertices.coGroup(edgesWithTargets)
-						.where(0).equalTo("f0.f0")
-						.with(new ApplyNeighborCoGroupFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
-			case ALL:
-				// create <edge-sourceOrTargetVertex> pairs
-				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
-						.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
-						.join(this.vertices).where(1).equalTo(0)
-						.with(new ProjectEdgeWithNeighbor<K, VV, EV>());
-
-				return vertices.coGroup(edgesWithNeighbors)
-						.where(0).equalTo(0)
-						.with(new ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
-			default:
-				throw new IllegalArgumentException("Illegal edge direction");
-		}
-	}
-
-
-	/**
-	 * Compute an aggregate over the neighbors (edges and vertices) of each
-	 * vertex. The function applied on the neighbors only has access to the
-	 * vertex id (not the vertex value).
-	 * 
-	 * @param neighborsFunction the function to apply to the neighborhood
-	 * @param direction the edge direction (in-, out-, all-)
-	 * @param <T> the output type
-	 * @return a dataset of a T
-	 * @throws IllegalArgumentException
-	 */
-	public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
-												EdgeDirection direction) throws IllegalArgumentException {
-		switch (direction) {
-		case IN:
-			// create <edge-sourceVertex> pairs
-			DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
-					.join(this.vertices).where(0).equalTo(0)
-					.with(new ProjectVertexIdJoin<K, VV, EV>(1))
-					.withForwardedFieldsFirst("f1->f0");
-			return edgesWithSources.groupBy(0).reduceGroup(
-					new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction));
-		case OUT:
-			// create <edge-targetVertex> pairs
-			DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
-					.join(this.vertices).where(1).equalTo(0)
-					.with(new ProjectVertexIdJoin<K, VV, EV>(0))
-					.withForwardedFieldsFirst("f0");
-			return edgesWithTargets.groupBy(0).reduceGroup(
-					new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction));
-		case ALL:
-			// create <edge-sourceOrTargetVertex> pairs
-			DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
-					.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
-					.join(this.vertices).where(1).equalTo(0)
-					.with(new ProjectEdgeWithNeighbor<K, VV, EV>());
-
-			return edgesWithNeighbors.groupBy(0).reduceGroup(
-					new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction));
-		default:
-			throw new IllegalArgumentException("Illegal edge direction");
-		}
-	}
-
-	/**
-	 * Compute an aggregate over the neighbors (edges and vertices) of each
-	 * vertex. The function applied on the neighbors only has access to the
-	 * vertex id (not the vertex value).
-	 *
-	 * @param neighborsFunction the function to apply to the neighborhood
-	 * @param direction the edge direction (in-, out-, all-)
-	 * @param <T> the output type
-	 * @param typeInfo the explicit return type.
-	 * @return a dataset of a T
-	 * @throws IllegalArgumentException
-	 */
-	public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
-												EdgeDirection direction, TypeInformation<T> typeInfo) throws IllegalArgumentException {
-		switch (direction) {
-			case IN:
-				// create <edge-sourceVertex> pairs
-				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
-						.join(this.vertices).where(0).equalTo(0)
-						.with(new ProjectVertexIdJoin<K, VV, EV>(1))
-						.withForwardedFieldsFirst("f1->f0");
-				return edgesWithSources.groupBy(0).reduceGroup(
-						new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
-			case OUT:
-				// create <edge-targetVertex> pairs
-				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
-						.join(this.vertices).where(1).equalTo(0)
-						.with(new ProjectVertexIdJoin<K, VV, EV>(0))
-						.withForwardedFieldsFirst("f0");
-				return edgesWithTargets.groupBy(0).reduceGroup(
-						new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
-			case ALL:
-				// create <edge-sourceOrTargetVertex> pairs
-				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
-						.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
-						.join(this.vertices).where(1).equalTo(0)
-						.with(new ProjectEdgeWithNeighbor<K, VV, EV>());
-
-				return edgesWithNeighbors.groupBy(0).reduceGroup(
-						new ApplyNeighborGroupReduceFunction<K, VV, EV, T>(neighborsFunction)).returns(typeInfo);
-			default:
-				throw new IllegalArgumentException("Illegal edge direction");
-		}
-	}
-
-	private static final class ApplyNeighborGroupReduceFunction<K, VV, EV, T>
-			implements GroupReduceFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
-
-		private NeighborsFunction<K, VV, EV, T> function;
-
-		public ApplyNeighborGroupReduceFunction(NeighborsFunction<K, VV, EV, T> fun) {
-			this.function = fun;
-		}
-
-		public void reduce(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edges, Collector<T> out) throws Exception {
-			function.iterateNeighbors(edges, out);
-		}
-
-		@Override
-		public TypeInformation<T> getProducedType() {
-			return TypeExtractor.createTypeInfo(NeighborsFunction.class, function.getClass(), 3, null, null);
-		}
-	}
-
-	@ForwardedFieldsSecond("f1")
-	private static final class ProjectVertexWithNeighborValueJoin<K, VV, EV>
-			implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Tuple2<K, VV>> {
-
-		private int fieldPosition;
-
-		public ProjectVertexWithNeighborValueJoin(int position) {
-			this.fieldPosition = position;
-		}
-
-		@SuppressWarnings("unchecked")
-		public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex, 
-				Collector<Tuple2<K, VV>> out) {
-			out.collect(new Tuple2<K, VV>((K) edge.getField(fieldPosition), otherVertex.getValue()));
-		}
-	}
-
-	private static final class ProjectVertexIdJoin<K, VV, EV> implements FlatJoinFunction<
-		Edge<K, EV>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
-
-		private int fieldPosition;
-
-		public ProjectVertexIdJoin(int position) {
-			this.fieldPosition = position;
-		}
-
-		@SuppressWarnings("unchecked")
-		public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex,
-						Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
-			out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>((K) edge.getField(fieldPosition), edge, otherVertex));
-		}
-	}
-
-	@ForwardedFieldsFirst("f0")
-	@ForwardedFieldsSecond("f1")
-	private static final class ProjectNeighborValue<K, VV, EV> implements FlatJoinFunction<
-		Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple2<K, VV>> {
-
-		public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor,
-				Collector<Tuple2<K, VV>> out) {
-
-			out.collect(new Tuple2<K, VV>(keysWithEdge.f0, neighbor.getValue()));
-		}
-	}
-
-	@ForwardedFieldsFirst("f0; f2->f1")
-	@ForwardedFieldsSecond("*->f2")
-	private static final class ProjectEdgeWithNeighbor<K, VV, EV> implements FlatJoinFunction<
-		Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
-
-		public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor,
-						Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
-			out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>(keysWithEdge.f0, keysWithEdge.f2, neighbor));
-		}
-	}
-
-	private static final class ApplyNeighborCoGroupFunction<K, VV, EV, T> implements CoGroupFunction<
-		Vertex<K, VV>, Tuple2<Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
-
-		private NeighborsFunctionWithVertexValue<K, VV, EV, T> function;
-
-		public ApplyNeighborCoGroupFunction(NeighborsFunctionWithVertexValue<K, VV, EV, T> fun) {
-			this.function = fun;
-		}
-
-		public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors,
-				Collector<T> out) throws Exception {
-			function.iterateNeighbors(vertex.iterator().next(),	neighbors, out);
-		}
-
-		@Override
-		public TypeInformation<T> getProducedType() {
-			return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class,	function.getClass(), 3, null, null);
-		}
-	}
-
-	private static final class ApplyCoGroupFunctionOnAllNeighbors<K, VV, EV, T>
-			implements CoGroupFunction<Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>, T>, ResultTypeQueryable<T> {
-
-		private NeighborsFunctionWithVertexValue<K, VV, EV, T> function;
-
-		public ApplyCoGroupFunctionOnAllNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> fun) {
-			this.function = fun;
-		}
-
-		public void coGroup(Iterable<Vertex<K, VV>> vertex,
-				final Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithNeighbors, 
-				Collector<T> out) throws Exception {
-
-			final Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighborsIterator = new Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>>() {
-
-				final Iterator<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> keysWithEdgesIterator = keysWithNeighbors.iterator();
-
-				@Override
-				public boolean hasNext() {
-					return keysWithEdgesIterator.hasNext();
-				}
-
-				@Override
-				public Tuple2<Edge<K, EV>, Vertex<K, VV>> next() {
-					Tuple3<K, Edge<K, EV>, Vertex<K, VV>> next = keysWithEdgesIterator.next();
-					return new Tuple2<Edge<K, EV>, Vertex<K, VV>>(next.f1, next.f2);
-				}
-
-				@Override
-				public void remove() {
-					keysWithEdgesIterator.remove();
-				}
-			};
-
-			Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighborsIterable = new Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>>() {
-				public Iterator<Tuple2<Edge<K, EV>, Vertex<K, VV>>> iterator() {
-					return neighborsIterator;
-				}
-			};
-
-			Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
-
-			if (vertexIterator.hasNext()) {
-				function.iterateNeighbors(vertexIterator.next(), neighborsIterable, out);
-			} else {
-				throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
-			}
-		}
-
-		@Override
-		public TypeInformation<T> getProducedType() {
-			return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class,	function.getClass(), 3, null, null);
-		}
-	}
-
-	/**
-	 * Compute an aggregate over the neighbor values of each
-	 * vertex.
-	 *
-	 * @param reduceNeighborsFunction the function to apply to the neighborhood
-	 * @param direction the edge direction (in-, out-, all-)
-	 * @return a Dataset containing one value per vertex (vertex id, aggregate vertex value)
-	 * @throws IllegalArgumentException
-	 */
-	public DataSet<Tuple2<K, VV>> reduceOnNeighbors(ReduceNeighborsFunction<VV> reduceNeighborsFunction,
-									EdgeDirection direction) throws IllegalArgumentException {
-		switch (direction) {
-			case IN:
-				// create <vertex-source value> pairs
-				final DataSet<Tuple2<K, VV>> verticesWithSourceNeighborValues = edges
-						.join(this.vertices).where(0).equalTo(0)
-						.with(new ProjectVertexWithNeighborValueJoin<K, VV, EV>(1))
-						.withForwardedFieldsFirst("f1->f0");
-				return verticesWithSourceNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(
-						reduceNeighborsFunction));
-			case OUT:
-				// create <vertex-target value> pairs
-				DataSet<Tuple2<K, VV>> verticesWithTargetNeighborValues = edges
-						.join(this.vertices).where(1).equalTo(0)
-						.with(new ProjectVertexWithNeighborValueJoin<K, VV, EV>(0))
-						.withForwardedFieldsFirst("f0");
-				return verticesWithTargetNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(
-						reduceNeighborsFunction));
-			case ALL:
-				// create <vertex-neighbor value> pairs
-				DataSet<Tuple2<K, VV>> verticesWithNeighborValues = edges
-						.flatMap(new EmitOneEdgeWithNeighborPerNode<K, EV>())
-						.join(this.vertices).where(1).equalTo(0)
-						.with(new ProjectNeighborValue<K, VV, EV>());
-
-				return verticesWithNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(
-						reduceNeighborsFunction));
-			default:
-				throw new IllegalArgumentException("Illegal edge direction");
-		}
-	}
-
-	@ForwardedFields("f0")
-	private static final class ApplyNeighborReduceFunction<K, VV> implements ReduceFunction<Tuple2<K, VV>> {
-
-		private ReduceNeighborsFunction<VV> function;
-
-		public ApplyNeighborReduceFunction(ReduceNeighborsFunction<VV> fun) {
-			this.function = fun;
-		}
-
-		@Override
-		public Tuple2<K, VV> reduce(Tuple2<K, VV> first, Tuple2<K, VV> second) throws Exception {
-			first.setField(function.reduceNeighbors(first.f1, second.f1), 1);
-			return first;
-		}
-	}
-
-	/**
-	 * Compute an aggregate over the edge values of each vertex.
-	 *
-	 * @param reduceEdgesFunction
-	 *            the function to apply to the neighborhood
-	 * @param direction
-	 *            the edge direction (in-, out-, all-)
-	 * @return a Dataset containing one value per vertex(vertex key, aggregate edge value)
-	 * @throws IllegalArgumentException
-	 */
-	public DataSet<Tuple2<K, EV>> reduceOnEdges(ReduceEdgesFunction<EV> reduceEdgesFunction,
-								EdgeDirection direction) throws IllegalArgumentException {
-
-		switch (direction) {
-			case IN:
-				return edges.map(new ProjectVertexWithEdgeValueMap<K, EV>(1))
-						.withForwardedFields("f1->f0")
-						.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
-			case OUT:
-				return edges.map(new ProjectVertexWithEdgeValueMap<K, EV>(0))
-						.withForwardedFields("f0->f0")
-						.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
-			case ALL:
-				return edges.flatMap(new EmitOneVertexWithEdgeValuePerNode<K, EV>())
-						.withForwardedFields("f2->f1")
-						.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
-			default:
-				throw new IllegalArgumentException("Illegal edge direction");
-		}
-	}
-
-	@ForwardedFields("f0")
-	private static final class ApplyReduceFunction<K, EV> implements ReduceFunction<Tuple2<K, EV>> {
-
-		private ReduceEdgesFunction<EV> function;
-
-		public ApplyReduceFunction(ReduceEdgesFunction<EV> fun) {
-			this.function = fun;
-		}
-
-		@Override
-		public Tuple2<K, EV> reduce(Tuple2<K, EV> first, Tuple2<K, EV> second) throws Exception {
-			first.setField(function.reduceEdges(first.f1, second.f1), 1);
-			return first;
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/91ffbc1e/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
deleted file mode 100644
index 08cf011..0000000
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/GraphAlgorithm.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph;
-
-/**
- * @param <K> key type
- * @param <VV> vertex value type
- * @param <EV> edge value type
- * @param <T> the return type
- */
-public interface GraphAlgorithm<K, VV, EV, T> {
-
-	public T run(Graph<K, VV, EV> input) throws Exception;
-}