You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/04/26 14:30:44 UTC

[3/5] flink git commit: [FLINK-1758] [gelly] Neighborhood Methods Extensions

[FLINK-1758] [gelly] Neighborhood Methods Extensions


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9de640af
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9de640af
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9de640af

Branch: refs/heads/master
Commit: 9de640af9cd23927bbd59a0b5794b93e85956551
Parents: 6e24879
Author: andralungu <lu...@gmail.com>
Authored: Tue Apr 7 22:29:18 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sun Apr 26 14:05:38 2015 +0200

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        |  59 +--
 .../org/apache/flink/graph/EdgeDirection.java   |   8 +-
 .../org/apache/flink/graph/EdgesFunction.java   |   5 +-
 .../graph/EdgesFunctionWithVertexValue.java     |   5 +-
 .../main/java/org/apache/flink/graph/Graph.java | 159 ++++++-
 .../apache/flink/graph/NeighborsFunction.java   |   5 +-
 .../graph/NeighborsFunctionWithVertexValue.java |   5 +-
 .../apache/flink/graph/ReduceEdgesFunction.java |  36 ++
 .../flink/graph/ReduceNeighborsFunction.java    |  40 ++
 .../flink/graph/example/MusicProfiles.java      |   8 +-
 .../operations/ReduceOnEdgesMethodsITCase.java  | 457 ++++++++++++++++---
 .../ReduceOnNeighborMethodsITCase.java          | 456 ++++++++++++++++--
 12 files changed, 1090 insertions(+), 153 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 21bc335..1a02e6d 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -266,7 +266,10 @@ Neighborhood Methods
 
 Neighborhood methods allow vertices to perform an aggregation on their first-hop neighborhood.
 
-`reduceOnEdges()` can be used to compute an aggregation on the neighboring edges of a vertex, while `reduceOnNeighbors()` has access on both the neighboring edges and vertices. The neighborhood scope is defined by the `EdgeDirection` parameter, which takes the values `IN`, `OUT` or `ALL`. `IN` will gather all in-coming edges (neighbors) of a vertex, `OUT` will gather all out-going edges (neighbors), while `ALL` will gather all edges (neighbors).
+`groupReduceOnEdges()` can be used to compute an aggregation on the neighboring edges of a vertex, while `groupReduceOnNeighbors()` has access on both the neighboring edges and vertices. The neighborhood scope is defined by the `EdgeDirection` parameter, which takes the values `IN`, `OUT` or `ALL`. `IN` will gather all in-coming edges (neighbors) of a vertex, `OUT` will gather all out-going edges (neighbors), while `ALL` will gather all edges (neighbors).
+
+The `groupReduceOnEdges()` and `groupReduceOnNeighbors()` methods return zero, one or more values per vertex.
+When returning a single value per vertex, `reduceOnEdges()` or `reduceOnNeighbors()` should be called as they are more efficient.
 
 For example, assume that you want to select the minimum weight of all out-edges for each vertex in the following graph:
 
@@ -279,25 +282,28 @@ The following code will collect the out-edges for each vertex and apply the `Sel
 {% highlight java %}
 Graph<Long, Long, Double> graph = ...
 
-DataSet<Tuple2<Long, Double>> minWeights = graph.reduceOnEdges(
+DataSet<Tuple2<Long, Double>> minWeights = graph.groupReduceOnEdges(
 				new SelectMinWeight(), EdgeDirection.OUT);
 
 // user-defined function to select the minimum weight
-static final class SelectMinWeight implements EdgesFunction<Long, Double, Tuple2<Long, Double>> {
+static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
 
-    public Tuple2<Long, Double> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Double>>> edges) {
+		@Override
+		public void iterateEdges(Vertex<Long, Long> v,
+				Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
 
-        long minWeight = Double.MAX_VALUE;
-        long vertexId = -1;
+			long weight = Long.MAX_VALUE;
+			long minNeighborId = 0;
 
-        for (Tuple2<Long, Edge<Long, Double>> edge: edges) {
-            if (edge.f1.getValue() < weight) {
-            weight = edge.f1.getValue();
-            vertexId = edge.f0;
-        }
-        return new Tuple2<Long, Double>(vertexId, minWeight);
-    }
-}
+			for (Edge<Long, Long> edge: edges) {
+				if (edge.getValue() < weight) {
+					weight = edge.getValue();
+					minNeighborId = edge.getTarget();
+				}
+			}
+			out.collect(new Tuple2<Long, Long>(v.getId(), minNeighborId));
+		}
+	}
 {% endhighlight %}
 
 <p class="text-center">
@@ -313,28 +319,23 @@ DataSet<Tuple2<Long, Long>> verticesWithSum = graph.reduceOnNeighbors(
 				new SumValues(), EdgeDirection.IN);
 
 // user-defined function to sum the neighbor values
-static final class SumValues implements NeighborsFunction<Long, Long, Double, Tuple2<Long, Long>> {
-		
-	public Tuple2<Long, Long> iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Double>, 
-		Vertex<Long, Long>>> neighbors) {
-		
-		long sum = 0;
-		long vertexId = -1;
+static final class SumValues implements ReduceNeighborsFunction<Long, Long, Double> {
 
-		for (Tuple3<Long, Edge<Long, Double>, Vertex<Long, Long>> neighbor : neighbors) {
-			vertexId = neighbor.f0;
-			sum += neighbor.f2.getValue();
-		}
-		return new Tuple2<Long, Long>(vertexId, sum);
-	}
+    public Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> reduceNeighbors(Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> firstNeighbor,
+    																		Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> secondNeighbor) {
+
+    	long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue();
+    	return new Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>(firstNeighbor.f0, firstNeighbor.f1,
+    			new Vertex<Long, Long>(firstNeighbor.f0, sum));
+    }
 }
 {% endhighlight %}
 
 <p class="text-center">
-    <img alt="reduseOnNeighbors Example" width="70%" src="fig/gelly-reduceOnNeighbors.png"/>
+    <img alt="reduceOnNeighbors Example" width="70%" src="img/gelly-reduceOnNeighbors.png"/>
 </p>
 
-When the aggregation computation does not require access to the vertex value (for which the aggregation is performed), it is advised to use the more efficient `EdgesFunction` and `NeighborsFunction` for the user-defined functions. When access to the vertex value is required, one should use `EdgesFunctionWithVertexValue` and `NeighborsFunctionWithVertexValue` instead. 
+When the aggregation computation does not require access to the vertex value (for which the aggregation is performed), it is advised to use the more efficient `EdgesFunction` and `NeighborsFunction` for the user-defined functions. When access to the vertex value is required, one should use `EdgesFunctionWithVertexValue` and `NeighborsFunctionWithVertexValue` instead.
 
 [Back to top](#top)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
index 379b302..65d4098 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgeDirection.java
@@ -20,10 +20,10 @@ package org.apache.flink.graph;
 
 /**
  * The EdgeDirection is used to select a node's neighborhood
- * by the {@link Graph#reduceOnEdges(EdgesFunction, EdgeDirection)},
- * {@link Graph#reduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)},
- * {@link Graph#reduceOnNeighbors(NeighborsFunction, EdgeDirection)} and
- * {@link Graph#reduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)}
+ * by the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)},
+ * {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)},
+ * {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)} and
+ * {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)}
  * methods.
  */
 public enum EdgeDirection {

http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
index d35385f..aac63db 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunction.java
@@ -22,10 +22,11 @@ import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
 
 /**
  * Interface to be implemented by the function applied to a vertex neighborhood
- * in the {@link Graph#reduceOnEdges(EdgesFunction, EdgeDirection)} method.
+ * in the {@link Graph#groupReduceOnEdges(EdgesFunction, EdgeDirection)} method.
  *
  * @param <K> the vertex key type
  * @param <EV> the edge value type
@@ -34,5 +35,5 @@ import org.apache.flink.api.java.tuple.Tuple2;
 public interface EdgesFunction<K extends Comparable<K> & Serializable, 
 	EV extends Serializable, O> extends Function, Serializable {
 
-	O iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges) throws Exception;
+	void iterateEdges(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
index dd0f518..f4f4320 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/EdgesFunctionWithVertexValue.java
@@ -21,10 +21,11 @@ package org.apache.flink.graph;
 import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.util.Collector;
 
 /**
  * Interface to be implemented by the function applied to a vertex neighborhood
- * in the {@link Graph#reduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)}
+ * in the {@link Graph#groupReduceOnEdges(EdgesFunctionWithVertexValue, EdgeDirection)}
  * method.
  *
  * @param <K> the vertex key type
@@ -35,5 +36,5 @@ import org.apache.flink.api.common.functions.Function;
 public interface EdgesFunctionWithVertexValue<K extends Comparable<K> & Serializable, 
 	VV extends Serializable, EV extends Serializable, O> extends Function, Serializable {
 
-	O iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges) throws Exception;
+	void iterateEdges(Vertex<K, VV> v, Iterable<Edge<K, EV>> edges, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index f843827..1325e0c 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -28,10 +28,11 @@ import java.util.Arrays;
 import org.apache.flink.api.common.functions.CoGroupFunction;
 import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.functions.FlatMapFunction;
-import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
-import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
@@ -722,8 +723,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 * @return a dataset of a T
 	 * @throws IllegalArgumentException
 	 */
-	public <T> DataSet<T> reduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
-			EdgeDirection direction) throws IllegalArgumentException {
+	public <T> DataSet<T> groupReduceOnEdges(EdgesFunctionWithVertexValue<K, VV, EV, T> edgesFunction,
+											 EdgeDirection direction) throws IllegalArgumentException {
 
 		switch (direction) {
 		case IN:
@@ -753,8 +754,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 * @return a dataset of T
 	 * @throws IllegalArgumentException
 	 */
-	public <T> DataSet<T> reduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
-			EdgeDirection direction) throws IllegalArgumentException {
+	public <T> DataSet<T> groupReduceOnEdges(EdgesFunction<K, EV, T> edgesFunction,
+											 EdgeDirection direction) throws IllegalArgumentException {
 
 		switch (direction) {
 		case IN:
@@ -796,7 +797,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 
 		public void reduce(Iterable<Tuple2<K, Edge<K, EV>>> edges, Collector<T> out) throws Exception {
-			out.collect(function.iterateEdges(edges));
+			function.iterateEdges(edges, out);
 		}
 
 		@Override
@@ -832,7 +833,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 
 		public void coGroup(Iterable<Vertex<K, VV>> vertex,
 				Iterable<Edge<K, EV>> edges, Collector<T> out) throws Exception {
-			out.collect(function.iterateEdges(vertex.iterator().next(), edges));
+			function.iterateEdges(vertex.iterator().next(), edges, out);
 		}
 
 		@Override
@@ -880,7 +881,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 				}
 			};
 
-			out.collect(function.iterateEdges(vertex.iterator().next(),	edgesIterable));
+			function.iterateEdges(vertex.iterator().next(),	edgesIterable, out);
 		}
 
 		@Override
@@ -1238,8 +1239,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 * @return a dataset of a T
 	 * @throws IllegalArgumentException
 	 */
-	public <T> DataSet<T> reduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
-			EdgeDirection direction) throws IllegalArgumentException {
+	public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunctionWithVertexValue<K, VV, EV, T> neighborsFunction,
+												 EdgeDirection direction) throws IllegalArgumentException {
 		switch (direction) {
 		case IN:
 			// create <edge-sourceVertex> pairs
@@ -1281,8 +1282,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 * @return a dataset of a T
 	 * @throws IllegalArgumentException
 	 */
-	public <T> DataSet<T> reduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
-			EdgeDirection direction) throws IllegalArgumentException {
+	public <T> DataSet<T> groupReduceOnNeighbors(NeighborsFunction<K, VV, EV, T> neighborsFunction,
+												 EdgeDirection direction) throws IllegalArgumentException {
 		switch (direction) {
 		case IN:
 			// create <edge-sourceVertex> pairs
@@ -1322,8 +1323,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 
 		public void reduce(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edges, Collector<T> out) throws Exception {
-			out.collect(function.iterateNeighbors(edges));
-
+			function.iterateNeighbors(edges, out);
 		}
 
 		@Override
@@ -1368,7 +1368,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 
 		public void coGroup(Iterable<Vertex<K, VV>> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors,
 				Collector<T> out) throws Exception {
-			out.collect(function.iterateNeighbors(vertex.iterator().next(),	neighbors));
+			function.iterateNeighbors(vertex.iterator().next(),	neighbors, out);
 		}
 
 		@Override
@@ -1417,8 +1417,7 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 				}
 			};
 
-			out.collect(function.iterateNeighbors(vertex.iterator().next(),
-					neighborsIterable));
+			function.iterateNeighbors(vertex.iterator().next(), neighborsIterable, out);
 		}
 
 		@Override
@@ -1426,4 +1425,126 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 			return TypeExtractor.createTypeInfo(NeighborsFunctionWithVertexValue.class,	function.getClass(), 3, null, null);
 		}
 	}
-}
\ No newline at end of file
+
+	/**
+	 * Compute an aggregate over the neighbors (edges and vertices) of each
+	 * vertex. The function applied on the neighbors only has access to the
+	 * vertex id (not the vertex value).
+	 *
+	 * @param reduceNeighborsFunction the function to apply to the neighborhood
+	 * @param direction the edge direction (in-, out-, all-)
+	 * @return a dataset containing one value per vertex
+	 * @throws IllegalArgumentException
+	 */
+	public DataSet reduceOnNeighbors(ReduceNeighborsFunction<K, VV, EV> reduceNeighborsFunction,
+									 EdgeDirection direction) throws IllegalArgumentException {
+		switch (direction) {
+			case IN:
+				// create <edge-sourceVertex> pairs
+				final DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
+						.join(this.vertices).where(0).equalTo(0)
+						.with(new ProjectVertexIdJoin<K, VV, EV>(1));
+				return edgesWithSources.groupBy(0).reduce(new ApplyNeighborReduceFunction<K,VV,EV>(reduceNeighborsFunction))
+						.map(new ApplyNeighborhoodMapFunction<K, VV, EV>());
+			case OUT:
+				// create <edge-targetVertex> pairs
+				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
+						.join(this.vertices).where(1).equalTo(0)
+						.with(new ProjectVertexIdJoin<K, VV, EV>(0));
+				return edgesWithTargets.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV, EV>(reduceNeighborsFunction))
+						.map(new ApplyNeighborhoodMapFunction<K, VV, EV>());
+			case ALL:
+				// create <edge-sourceOrTargetVertex> pairs
+				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
+						.flatMap(new EmitOneEdgeWithNeighborPerNode<K, VV, EV>())
+						.join(this.vertices).where(1).equalTo(0)
+						.with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+
+				return edgesWithNeighbors.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV, EV>(reduceNeighborsFunction))
+						.map(new ApplyNeighborhoodMapFunction<K, VV, EV>());
+			default:
+				throw new IllegalArgumentException("Illegal edge direction");
+		}
+	}
+
+	private static final class ApplyNeighborReduceFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+			implements ReduceFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
+
+		private ReduceNeighborsFunction<K, VV, EV> function;
+
+		public ApplyNeighborReduceFunction(ReduceNeighborsFunction<K, VV, EV> fun) {
+			this.function = fun;
+		}
+
+		@Override
+		public Tuple3<K, Edge<K, EV>, Vertex<K, VV>> reduce(Tuple3<K, Edge<K, EV>, Vertex<K, VV>> first,
+															Tuple3<K, Edge<K, EV>, Vertex<K, VV>> second) throws Exception {
+			return function.reduceNeighbors(first, second);
+		}
+	}
+
+	public static final class ApplyNeighborhoodMapFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+			implements MapFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>> ,Tuple2<K, VV>> {
+
+		@Override
+		public Tuple2<K, VV> map(Tuple3<K, Edge<K, EV>, Vertex<K, VV>> edgesWithSrc) throws Exception {
+			return new Tuple2<K, VV>(edgesWithSrc.f0, edgesWithSrc.f2.getValue());
+		}
+	}
+
+	/**
+	 * Compute an aggregate over the edges of each vertex. The function applied
+	 * on the edges only has access to the vertex id (not the vertex value).
+	 *
+	 * @param reduceEdgesFunction
+	 *            the function to apply to the neighborhood
+	 * @param direction
+	 *            the edge direction (in-, out-, all-)
+	 * @return a dataset containing one value per vertex
+	 * @throws IllegalArgumentException
+	 */
+	public DataSet reduceOnEdges(ReduceEdgesFunction<K, EV> reduceEdgesFunction,
+								 EdgeDirection direction) throws IllegalArgumentException {
+
+		switch (direction) {
+			case IN:
+				return edges.map(new ProjectVertexIdMap<K, EV>(1))
+						.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
+						.map(new ApplyEdgesMapFunction());
+			case OUT:
+				return edges.map(new ProjectVertexIdMap<K, EV>(0))
+						.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
+						.map(new ApplyEdgesMapFunction());
+			case ALL:
+				return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
+						.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
+						.map(new ApplyEdgesMapFunction());
+			default:
+				throw new IllegalArgumentException("Illegal edge direction");
+		}
+	}
+
+	private static final class ApplyReduceFunction<K extends Comparable<K> & Serializable, EV extends Serializable>
+			implements ReduceFunction<Tuple2<K, Edge<K, EV>>> {
+
+		private ReduceEdgesFunction<K, EV> function;
+
+		public ApplyReduceFunction(ReduceEdgesFunction<K, EV> fun) {
+			this.function = fun;
+		}
+
+		@Override
+		public Tuple2<K, Edge<K, EV>> reduce(Tuple2<K, Edge<K, EV>> first, Tuple2<K, Edge<K, EV>> second) throws Exception {
+			return function.reduceEdges(first, second);
+		}
+	}
+
+	public static final class ApplyEdgesMapFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+			implements MapFunction<Tuple2<K, Edge<K, EV>> ,Tuple2<K, EV>> {
+
+		@Override
+		public Tuple2<K, EV> map(Tuple2<K, Edge<K, EV>> edge) throws Exception {
+			return new Tuple2<K, EV>(edge.f0, edge.f1.getValue());
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
index a2d28b2..b43f9d1 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunction.java
@@ -22,10 +22,11 @@ import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.util.Collector;
 
 /**
  * Interface to be implemented by the function applied to a vertex neighborhood
- * in the {@link Graph#reduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)}
+ * in the {@link Graph#groupReduceOnNeighbors(NeighborsFunction, EdgeDirection)}
  * method.
  *
  * @param <K> the vertex key type
@@ -36,5 +37,5 @@ import org.apache.flink.api.java.tuple.Tuple3;
 public interface NeighborsFunction<K extends Comparable<K> & Serializable, VV extends Serializable, 
 	EV extends Serializable, O> extends Function, Serializable {
 
-	O iterateNeighbors(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> neighbors) throws Exception;
+	void iterateNeighbors(Iterable<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
index 438ed8a..32d184d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/NeighborsFunctionWithVertexValue.java
@@ -22,10 +22,11 @@ import java.io.Serializable;
 
 import org.apache.flink.api.common.functions.Function;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.util.Collector;
 
 /**
  * Interface to be implemented by the function applied to a vertex neighborhood
- * in the {@link Graph#reduceOnNeighbors(NeighborsFunction, EdgeDirection)}
+ * in the {@link Graph#groupReduceOnNeighbors(NeighborsFunctionWithVertexValue, EdgeDirection)}
  * method.
  *
  * @param <K> the vertex key type
@@ -36,5 +37,5 @@ import org.apache.flink.api.java.tuple.Tuple2;
 public interface NeighborsFunctionWithVertexValue<K extends Comparable<K> & Serializable, VV extends Serializable, 
 	EV extends Serializable, O> extends Function, Serializable {
 
-	O iterateNeighbors(Vertex<K, VV> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors) throws Exception;
+	void iterateNeighbors(Vertex<K, VV> vertex, Iterable<Tuple2<Edge<K, EV>, Vertex<K, VV>>> neighbors, Collector<O> out) throws Exception;
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
new file mode 100644
index 0000000..0b5d2cf
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.java.tuple.Tuple2;
+
+import java.io.Serializable;
+
+/**
+ * Interface to be implemented by the function applied to a vertex neighborhood
+ * in the {@link Graph#reduceOnEdges(org.apache.flink.graph.ReduceEdgesFunction, EdgeDirection)} method.
+ *
+ * @param <K> the vertex key type
+ * @param <EV> the edge value type
+ */
+public interface ReduceEdgesFunction<K extends Comparable<K> & Serializable,
+		EV extends Serializable> {
+
+	Tuple2<K, Edge<K, EV>> reduceEdges(Tuple2<K, Edge<K, EV>> firstEdge, Tuple2<K, Edge<K, EV>> secondEdge);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
new file mode 100644
index 0000000..50c0d35
--- /dev/null
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+import org.apache.flink.api.common.functions.Function;
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.io.Serializable;
+
+/**
+ * Interface to be implemented by the function applied to a vertex neighborhood
+ * in the {@link Graph#reduceOnNeighbors(ReduceNeighborsFunction, EdgeDirection)}
+ * method.
+ *
+ * @param <K> the vertex key type
+ * @param <VV> the vertex value type
+ * @param <EV> the edge value type
+ */
+public interface ReduceNeighborsFunction <K extends Comparable<K> & Serializable, VV extends Serializable,
+		EV extends Serializable> extends Function, Serializable {
+
+	Tuple3<K, Edge<K, EV>, Vertex<K, VV>> reduceNeighbors(Tuple3<K, Edge<K, EV>, Vertex<K, VV>> firstNeighbor,
+														  Tuple3<K, Edge<K, EV>, Vertex<K, VV>> secondNeighbor);
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
index 9b18623..e8871eb 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/example/MusicProfiles.java
@@ -100,7 +100,7 @@ public class MusicProfiles implements ProgramDescription {
 		 * Get the top track (most listened) for each user
 		 */
 		DataSet<Tuple2<String, String>> usersWithTopTrack = userSongGraph
-				.reduceOnEdges(new GetTopSongPerUser(), EdgeDirection.OUT)
+				.groupReduceOnEdges(new GetTopSongPerUser(), EdgeDirection.OUT)
 				.filter(new FilterSongNodes());
 
 		if (fileOutput) {
@@ -185,8 +185,8 @@ public class MusicProfiles implements ProgramDescription {
 	public static final class GetTopSongPerUser	implements EdgesFunctionWithVertexValue<String, NullValue, Integer,
 		Tuple2<String, String>> {
 
-		public Tuple2<String, String> iterateEdges(Vertex<String, NullValue> vertex, 
-				Iterable<Edge<String, Integer>> edges) {
+		public void iterateEdges(Vertex<String, NullValue> vertex,
+				Iterable<Edge<String, Integer>> edges, Collector<Tuple2<String, String>> out) throws Exception {
 
 			int maxPlaycount = 0;
 			String topSong = "";
@@ -196,7 +196,7 @@ public class MusicProfiles implements ProgramDescription {
 					topSong = edge.getTarget();
 				}
 			}
-			return new Tuple2<String, String>(vertex.getId(), topSong);
+			out.collect(new Tuple2<String, String>(vertex.getId(), topSong));
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
index ec0c84c..2452cba 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
@@ -26,9 +26,11 @@ import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.EdgesFunction;
 import org.apache.flink.graph.EdgesFunctionWithVertexValue;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.ReduceEdgesFunction;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -71,7 +73,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
 		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-				graph.reduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT);
+				graph.groupReduceOnEdges(new SelectMinWeightNeighbor(), EdgeDirection.OUT);
 		verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
 		env.execute();
 	
@@ -93,7 +95,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
 		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-				graph.reduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN);
+				graph.groupReduceOnEdges(new SelectMinWeightInNeighbor(), EdgeDirection.IN);
 		verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
 		env.execute();
 
@@ -105,6 +107,210 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
+	public void testAllOutNeighbors() throws Exception {
+		/*
+		 * Get the all the out-neighbors for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithAllOutNeighbors =
+				graph.groupReduceOnEdges(new SelectOutNeighbors(), EdgeDirection.OUT);
+		verticesWithAllOutNeighbors.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,2\n" +
+				"1,3\n" +
+				"2,3\n" +
+				"3,4\n" +
+				"3,5\n" +
+				"4,5\n" +
+				"5,1";
+	}
+
+	@Test
+	public void testAllOutNeighborsNoValue() throws Exception {
+		/*
+		 * Get the all the out-neighbors for each vertex except for the vertex with id 5.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithAllOutNeighbors =
+				graph.groupReduceOnEdges(new SelectOutNeighborsExcludeFive(), EdgeDirection.OUT);
+		verticesWithAllOutNeighbors.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,2\n" +
+				"1,3\n" +
+				"2,3\n" +
+				"3,4\n" +
+				"3,5\n" +
+				"4,5";
+	}
+
+	@Test
+	public void testAllOutNeighborsWithValueGreaterThanTwo() throws Exception {
+		/*
+		 * Get the all the out-neighbors for each vertex that have a value greater than two.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithAllOutNeighbors =
+				graph.groupReduceOnEdges(new SelectOutNeighborsValueGreaterThanTwo(), EdgeDirection.OUT);
+		verticesWithAllOutNeighbors.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "3,4\n" +
+				"3,5\n" +
+				"4,5\n" +
+				"5,1";
+	}
+
+	@Test
+	public void testAllInNeighbors() throws Exception {
+		/*
+		 * Get the all the in-neighbors for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors =
+				graph.groupReduceOnEdges(new SelectInNeighbors(), EdgeDirection.IN);
+		verticesWithAllInNeighbors.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,5\n" +
+				"2,1\n" +
+				"3,1\n" +
+				"3,2\n" +
+				"4,3\n" +
+				"5,3\n" +
+				"5,4";
+	}
+
+	@Test
+	public void testAllInNeighborsNoValue() throws Exception {
+		/*
+		 * Get the all the in-neighbors for each vertex except for the vertex with id 5.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors =
+				graph.groupReduceOnEdges(new SelectInNeighborsExceptFive(), EdgeDirection.IN);
+		verticesWithAllInNeighbors.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,5\n" +
+				"2,1\n" +
+				"3,1\n" +
+				"3,2\n" +
+				"4,3";
+	}
+
+	@Test
+	public void testAllInNeighborsWithValueGreaterThanTwo() throws Exception {
+		/*
+		 * Get the all the in-neighbors for each vertex that have a value greater than two.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithAllInNeighbors =
+				graph.groupReduceOnEdges(new SelectInNeighborsValueGreaterThanTwo(), EdgeDirection.IN);
+		verticesWithAllInNeighbors.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "3,1\n" +
+				"3,2\n" +
+				"4,3\n" +
+				"5,3\n" +
+				"5,4";
+	}
+
+	@Test
+	public void testAllNeighbors() throws Exception {
+		/*
+		 * Get the all the neighbors for each vertex
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
+				graph.groupReduceOnEdges(new SelectNeighbors(), EdgeDirection.ALL);
+		verticesWithAllNeighbors.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,2\n" +
+				"1,3\n" +
+				"1,5\n" +
+				"2,1\n" +
+				"2,3\n" +
+				"3,1\n" +
+				"3,2\n" +
+				"3,4\n" +
+				"3,5\n" +
+				"4,3\n" +
+				"4,5\n" +
+				"5,1\n" +
+				"5,3\n" +
+				"5,4";
+	}
+
+	@Test
+	public void testAllNeighborsNoValue() throws Exception {
+		/*
+		 * Get the all the neighbors for each vertex except for vertices with id 5 and 2.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
+				graph.groupReduceOnEdges(new SelectNeighborsExceptFiveAndTwo(), EdgeDirection.ALL);
+		verticesWithAllNeighbors.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,2\n" +
+				"1,3\n" +
+				"1,5\n" +
+				"3,1\n" +
+				"3,2\n" +
+				"3,4\n" +
+				"3,5\n" +
+				"4,3\n" +
+				"4,5";
+	}
+
+	@Test
+	public void testAllNeighborsWithValueGreaterThanFour() throws Exception {
+		/*
+		 * Get the all the neighbors for each vertex that have a value greater than four.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithAllNeighbors =
+				graph.groupReduceOnEdges(new SelectNeighborsValueGreaterThanFour(), EdgeDirection.ALL);
+		verticesWithAllNeighbors.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "5,1\n" +
+				"5,3\n" +
+				"5,4";
+	}
+
+	@Test
 	public void testMaxWeightEdge() throws Exception {
 		/*
 		 * Get the maximum weight among all edges
@@ -115,7 +321,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
 		DataSet<Tuple2<Long, Long>> verticesWithMaxEdgeWeight = 
-				graph.reduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL);
+				graph.groupReduceOnEdges(new SelectMaxWeightNeighbor(), EdgeDirection.ALL);
 		verticesWithMaxEdgeWeight.writeAsCsv(resultPath);
 		env.execute();
 
@@ -159,7 +365,7 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
 		DataSet<Tuple2<Long, Long>> verticesWithLowestOutNeighbor = 
-				graph.reduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN);
+				graph.groupReduceOnEdges(new SelectMinWeightInNeighborNoValue(), EdgeDirection.IN);
 		verticesWithLowestOutNeighbor.writeAsCsv(resultPath);
 		env.execute();
 
@@ -195,28 +401,29 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 	@SuppressWarnings("serial")
 	private static final class SelectMinWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
 
-		public Tuple2<Long, Long> iterateEdges(
-				Vertex<Long, Long> v,
-				Iterable<Edge<Long, Long>> edges) {
+		@Override
+		public void iterateEdges(Vertex<Long, Long> v,
+				Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
 			
 			long weight = Long.MAX_VALUE;
-			long minNeighorId = 0;
-			
+			long minNeighborId = 0;
+
 			for (Edge<Long, Long> edge: edges) {
 				if (edge.getValue() < weight) {
 					weight = edge.getValue();
-					minNeighorId = edge.getTarget();
+					minNeighborId = edge.getTarget();
 				}
 			}
-			return new Tuple2<Long, Long>(v.getId(), minNeighorId);
+			out.collect(new Tuple2<Long, Long>(v.getId(), minNeighborId));
 		}
 	}
 
 	@SuppressWarnings("serial")
 	private static final class SelectMaxWeightNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
 
-		public Tuple2<Long, Long> iterateEdges(Vertex<Long, Long> v,
-				Iterable<Edge<Long, Long>> edges) {
+		@Override
+		public void iterateEdges(Vertex<Long, Long> v,
+				Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
 			
 			long weight = Long.MIN_VALUE;
 
@@ -225,60 +432,65 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 					weight = edge.getValue();
 				}
 			}
-			return new Tuple2<Long, Long>(v.getId(), weight);
+			out.collect(new Tuple2<Long, Long>(v.getId(), weight));
 		}
 	}
 
 	@SuppressWarnings("serial")
-	private static final class SelectMinWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+	private static final class SelectMinWeightNeighborNoValue implements ReduceEdgesFunction<Long, Long> {
 
-		public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
+		@Override
+		public Tuple2<Long, Edge<Long, Long>> reduceEdges(Tuple2<Long, Edge<Long, Long>> firstEdge,
+														  Tuple2<Long, Edge<Long, Long>> secondEdge) {
 
-			long weight = Long.MAX_VALUE;
-			long minNeighorId = 0;
-			long vertexId = -1;
-			long i=0;
-
-			for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
-				if (edge.f1.getValue() < weight) {
-					weight = edge.f1.getValue();
-					minNeighorId = edge.f1.getTarget();
-				}
-				if (i==0) {
-					vertexId = edge.f0;
-				} i++;
+			if(firstEdge.f1.getValue() < secondEdge.f1.getValue()) {
+				return firstEdge;
+			} else {
+				return secondEdge;
 			}
-			return new Tuple2<Long, Long>(vertexId, minNeighorId);
+
 		}
 	}
 
 	@SuppressWarnings("serial")
-	private static final class SelectMaxWeightNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
-
-		public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
-			
-			long weight = Long.MIN_VALUE;
-			long vertexId = -1;
-			long i=0;
-
-			for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
-				if (edge.f1.getValue() > weight) {
-					weight = edge.f1.getValue();
-				}
-				if (i==0) {
-					vertexId = edge.f0;
-				} i++;
+	private static final class SelectMaxWeightNeighborNoValue implements ReduceEdgesFunction<Long, Long> {
+
+//		@Override
+//		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
+//								 Collector<Tuple2<Long, Long>> out) throws Exception {
+//
+//			long weight = Long.MIN_VALUE;
+//			long vertexId = -1;
+//			long i=0;
+//
+//			for (Tuple2<Long, Edge<Long, Long>> edge: edges) {
+//				if (edge.f1.getValue() > weight) {
+//					weight = edge.f1.getValue();
+//				}
+//				if (i==0) {
+//					vertexId = edge.f0;
+//				} i++;
+//			}
+//			out.collect(new Tuple2<Long, Long>(vertexId, weight));
+//		}
+
+		@Override
+		public Tuple2<Long, Edge<Long, Long>> reduceEdges(Tuple2<Long, Edge<Long, Long>> firstEdge,
+														  Tuple2<Long, Edge<Long, Long>> secondEdge) {
+			if(firstEdge.f1.getValue() > secondEdge.f1.getValue()) {
+				return firstEdge;
+			} else {
+				return secondEdge;
 			}
-			return new Tuple2<Long, Long>(vertexId, weight);
 		}
 	}
 
 	@SuppressWarnings("serial")
 	private static final class SelectMinWeightInNeighbor implements EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
 
-		public Tuple2<Long, Long> iterateEdges(
-				Vertex<Long, Long> v,
-				Iterable<Edge<Long, Long>> edges) {
+		@Override
+		public void iterateEdges(Vertex<Long, Long> v,
+				Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
 			
 			long weight = Long.MAX_VALUE;
 			long minNeighorId = 0;
@@ -289,14 +501,16 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 					minNeighorId = edge.getSource();
 				}
 			}
-			return new Tuple2<Long, Long>(v.getId(), minNeighorId);
+			out.collect(new Tuple2<Long, Long>(v.getId(), minNeighorId));
 		}
 	}
 
 	@SuppressWarnings("serial")
 	private static final class SelectMinWeightInNeighborNoValue implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
 
-		public Tuple2<Long, Long> iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges) {
+		@Override
+		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
+								 Collector<Tuple2<Long, Long>> out) throws Exception {
 			
 			long weight = Long.MAX_VALUE;
 			long minNeighorId = 0;
@@ -312,7 +526,146 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 					vertexId = edge.f0;
 				} i++;
 			}
-			return new Tuple2<Long, Long>(vertexId, minNeighorId);
+			out.collect(new Tuple2<Long, Long>(vertexId, minNeighorId));
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectOutNeighbors implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
+								 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
+				out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectOutNeighborsExcludeFive implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
+								 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
+				if(edge.f0 != 5) {
+					out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
+				}
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectOutNeighborsValueGreaterThanTwo implements
+			EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateEdges(Vertex<Long, Long> v, Iterable<Edge<Long, Long>> edges,
+								 Collector<Tuple2<Long, Long>> out) throws Exception {
+			for (Edge<Long, Long> edge: edges) {
+				if(v.getValue() > 2) {
+					out.collect(new Tuple2<Long, Long>(v.getId(), edge.getTarget()));
+				}
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectInNeighbors implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
+								 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
+				out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectInNeighborsExceptFive implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
+								 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			for(Tuple2<Long, Edge<Long, Long>> edge : edges) {
+				if(edge.f0 != 5) {
+					out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
+				}
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectInNeighborsValueGreaterThanTwo implements
+			EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateEdges(Vertex<Long, Long> v, Iterable<Edge<Long, Long>> edges,
+								 Collector<Tuple2<Long, Long>> out) throws Exception {
+			for (Edge<Long, Long> edge: edges) {
+				if(v.getValue() > 2) {
+					out.collect(new Tuple2<Long, Long>(v.getId(), edge.getSource()));
+				}
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectNeighbors implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
+								 Collector<Tuple2<Long, Long>> out) throws Exception {
+			for (Tuple2<Long, Edge<Long, Long>> edge : edges) {
+				if (edge.f0 == edge.f1.getTarget()) {
+					out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
+				} else {
+					out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
+				}
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectNeighborsExceptFiveAndTwo implements EdgesFunction<Long, Long, Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateEdges(Iterable<Tuple2<Long, Edge<Long, Long>>> edges,
+								 Collector<Tuple2<Long, Long>> out) throws Exception {
+			for (Tuple2<Long, Edge<Long, Long>> edge : edges) {
+				if(edge.f0 != 5 && edge.f0 != 2) {
+					if (edge.f0 == edge.f1.getTarget()) {
+						out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getSource()));
+					} else {
+						out.collect(new Tuple2<Long, Long>(edge.f0, edge.f1.getTarget()));
+					}
+				}
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SelectNeighborsValueGreaterThanFour implements
+			EdgesFunctionWithVertexValue<Long, Long, Long, Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateEdges(Vertex<Long, Long> v, Iterable<Edge<Long, Long>> edges,
+								 Collector<Tuple2<Long, Long>> out) throws Exception {
+			for(Edge<Long, Long> edge : edges) {
+				if(v.getValue() > 4) {
+					if(v.getId().equals(edge.getTarget())) {
+						out.collect(new Tuple2<Long, Long>(v.getId(), edge.getSource()));
+					} else {
+						out.collect(new Tuple2<Long, Long>(v.getId(), edge.getTarget()));
+					}
+				}
+			}
 		}
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/flink/blob/9de640af/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
index 785552c..5300d24 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.test.operations;
 
 import java.util.Iterator;
 
+import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -29,9 +30,11 @@ import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.NeighborsFunction;
 import org.apache.flink.graph.NeighborsFunctionWithVertexValue;
+import org.apache.flink.graph.ReduceNeighborsFunction;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.util.Collector;
 import org.junit.After;
 import org.junit.Before;
 import org.junit.Rule;
@@ -74,7 +77,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
 		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
-				graph.reduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT);
+				graph.groupReduceOnNeighbors(new SumOutNeighbors(), EdgeDirection.OUT);
 
 		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
 		env.execute();
@@ -96,7 +99,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
 		DataSet<Tuple2<Long, Long>> verticesWithSum = 
-				graph.reduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN);		
+				graph.groupReduceOnNeighbors(new SumInNeighbors(), EdgeDirection.IN);
 
 		verticesWithSum.writeAsCsv(resultPath);
 		env.execute();
@@ -119,7 +122,7 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
 		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
-				graph.reduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
+				graph.groupReduceOnNeighbors(new SumAllNeighbors(), EdgeDirection.ALL);
 
 		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
 		env.execute();
@@ -132,6 +135,65 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 	}
 
 	@Test
+	public void testSumOfOutNeighborsIdGreaterThanThree() throws Exception {
+		/*
+		 * Get the sum of out-neighbor values
+		 * for each vertex with id greater than three.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+				graph.groupReduceOnNeighbors(new SumOutNeighborsIdGreaterThanThree(), EdgeDirection.OUT);
+
+		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "4,5\n" +
+				"5,1\n";
+	}
+
+	@Test
+	public void testSumOfInNeighborsIdGreaterThanThree() throws Exception {
+		/*
+		 * Get the sum of in-neighbor values
+		 * times the edge weights for each vertex with id greater than three.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSum =
+				graph.groupReduceOnNeighbors(new SumInNeighborsIdGreaterThanThree(), EdgeDirection.IN);
+
+		verticesWithSum.writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "4,102\n" +
+				"5,285\n";
+	}
+
+	@Test
+	public void testSumOfOAllNeighborsIdGreaterThanThree() throws Exception {
+		/*
+		 * Get the sum of all neighbor values
+		 * including own vertex value
+		 * for each vertex with id greater than three.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+				graph.groupReduceOnNeighbors(new SumAllNeighborsIdGreaterThanThree(), EdgeDirection.ALL);
+
+		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "4,12\n" +
+				"5,13\n";
+	}
+
+	@Test
 	public void testSumOfOutNeighborsNoValue() throws Exception {
 		/*
 		 * Get the sum of out-neighbor values
@@ -166,14 +228,13 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 
 		DataSet<Tuple2<Long, Long>> verticesWithSum = 
 				graph.reduceOnNeighbors(new SumInNeighborsNoValue(), EdgeDirection.IN);
-
 		verticesWithSum.writeAsCsv(resultPath);
 		env.execute();
 	
 		expectedResult = "1,255\n" +
-				"2,12\n" + 
+				"2,12\n" +
 				"3,59\n" +
-				"4,102\n" + 
+				"4,102\n" +
 				"5,285\n";
 	}
 
@@ -187,10 +248,10 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env), 
 				TestGraphUtils.getLongLongEdgeData(env), env);
 
-		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues = 
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
 				graph.reduceOnNeighbors(new SumAllNeighborsNoValue(), EdgeDirection.ALL);
 
-		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+		verticesWithSumOfAllNeighborValues.writeAsCsv(resultPath);
 		env.execute();
 	
 		expectedResult = "1,10\n" +
@@ -200,33 +261,191 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 				"5,8\n";
 	}
 
+	@Test
+	public void testSumOfOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception {
+		/*
+		 * Get the sum of out-neighbor values
+		 * for each vertex with id greater than two as well as the same sum multiplied by two.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+				graph.groupReduceOnNeighbors(new SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.OUT);
+
+		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "3,9\n" +
+				"3,18\n" +
+				"4,5\n" +
+				"4,10\n" +
+				"5,1\n" +
+				"5,2";
+	}
+
+	@Test
+	public void testSumOfInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception {
+		/*
+		 * Get the sum of in-neighbor values
+		 * for each vertex with id greater than two as well as the same sum multiplied by two.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+				graph.groupReduceOnNeighbors(new SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.IN);
+		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "3,59\n" +
+				"3,118\n" +
+				"4,204\n" +
+				"4,102\n" +
+				"5,570\n" +
+				"5,285";
+	}
+
+	@Test
+	public void testSumOfAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo() throws Exception {
+		/*
+		 * Get the sum of all neighbor values
+		 * for each vertex with id greater than two as well as the same sum multiplied by two.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfAllNeighborValues =
+				graph.groupReduceOnNeighbors(new SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo(), EdgeDirection.ALL);
+
+		verticesWithSumOfAllNeighborValues.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "3,12\n" +
+				"3,24\n" +
+				"4,8\n" +
+				"4,16\n" +
+				"5,8\n" +
+				"5,16";
+	}
+
+	@Test
+	public void testSumOfOutNeighborsMultipliedByTwo() throws Exception {
+		/*
+		 * Get the sum of out-neighbor values
+		 * for each vertex as well as the sum of out-neighbor values multiplied by two.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+				graph.groupReduceOnNeighbors(new SumOutNeighborsMultipliedByTwo(), EdgeDirection.OUT);
+
+		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "1,5\n" +
+				"1,10\n" +
+				"2,3\n" +
+				"2,6\n" +
+				"3,9\n" +
+				"3,18\n" +
+				"4,5\n" +
+				"4,10\n" +
+				"5,1\n" +
+				"5,2";
+	}
+
+	@Test
+	public void testSumOfInNeighborsSubtractOne() throws Exception {
+		/*
+		 * Get the sum of in-neighbor values
+		 * times the edge weights for each vertex as well as the same sum minus one.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSum =
+				graph.groupReduceOnNeighbors(new SumInNeighborsSubtractOne(), EdgeDirection.IN);
+
+		verticesWithSum.writeAsCsv(resultPath);
+		env.execute();
+		expectedResult = "1,255\n" +
+				"1,254\n" +
+				"2,12\n" +
+				"2,11\n" +
+				"3,59\n" +
+				"3,58\n" +
+				"4,102\n" +
+				"4,101\n" +
+				"5,285\n" +
+				"5,284";
+	}
+
+	@Test
+	public void testSumOfOAllNeighborsAddFive() throws Exception {
+		/*
+		 * Get the sum of all neighbor values
+		 * including own vertex value
+		 * for each vertex as well as the same sum plus five.
+         */
+		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
+		Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
+				TestGraphUtils.getLongLongEdgeData(env), env);
+
+		DataSet<Tuple2<Long, Long>> verticesWithSumOfOutNeighborValues =
+				graph.groupReduceOnNeighbors(new SumAllNeighborsAddFive(), EdgeDirection.ALL);
+
+		verticesWithSumOfOutNeighborValues.writeAsCsv(resultPath);
+		env.execute();
+
+		expectedResult = "1,11\n" +
+				"1,16\n" +
+				"2,6\n" +
+				"2,11\n" +
+				"3,15\n" +
+				"3,20\n" +
+				"4,12\n" +
+				"4,17\n" +
+				"5,13\n" +
+				"5,18";
+	}
+
 	@SuppressWarnings("serial")
 	private static final class SumOutNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, 
 	Tuple2<Long, Long>> {
 
-		public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
-				Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+				Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+				Collector<Tuple2<Long, Long>> out) throws Exception {
 			
 			long sum = 0;
 			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
 				sum += neighbor.f1.getValue();
 			}
-			return new Tuple2<Long, Long>(vertex.getId(), sum);
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
 		}
 	}
 
 	@SuppressWarnings("serial")
 	private static final class SumInNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, 
 		Tuple2<Long, Long>> {
-		
-		public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
-				Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+				Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+				Collector<Tuple2<Long, Long>> out) throws Exception {
 		
 			long sum = 0;
 			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
 				sum += neighbor.f0.getValue() * neighbor.f1.getValue();
 			}
-			return new Tuple2<Long, Long>(vertex.getId(), sum);
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
 		}
 	}
 
@@ -234,23 +453,120 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 	private static final class SumAllNeighbors implements NeighborsFunctionWithVertexValue<Long, Long, Long, 
 		Tuple2<Long, Long>> {
 
-		public Tuple2<Long, Long> iterateNeighbors(Vertex<Long, Long> vertex,
-		Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
 	
 			long sum = 0;
 			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
 				sum += neighbor.f1.getValue();
 			}
-			return new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue());
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
 		}
 	}
 
 	@SuppressWarnings("serial")
-	private static final class SumOutNeighborsNoValue implements NeighborsFunction<Long, Long, Long, 
-		Tuple2<Long, Long>> {
+	private static final class SumOutNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+					sum += neighbor.f1.getValue();
+			}
+			if(vertex.getId() > 3) {
+				out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumInNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f0.getValue() * neighbor.f1.getValue();
+			}
+			if(vertex.getId() > 3) {
+				out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumAllNeighborsIdGreaterThanThree implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f1.getValue();
+			}
+			if(vertex.getId() > 3) {
+				out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumOutNeighborsNoValue implements ReduceNeighborsFunction<Long, Long, Long> {
+
+		@Override
+		public Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> reduceNeighbors(Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> firstNeighbor,
+																				  Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> secondNeighbor) {
+			long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue();
+			return new Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>(firstNeighbor.f0, firstNeighbor.f1,
+					new Vertex<Long, Long>(firstNeighbor.f0, sum));
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumInNeighborsNoValue implements ReduceNeighborsFunction<Long, Long, Long> {
+
+		@Override
+		public Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> reduceNeighbors(Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> firstNeighbor,
+																				  Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> secondNeighbor) {
+			long sum = firstNeighbor.f2.getValue() * firstNeighbor.f1.getValue() +
+					secondNeighbor.f2.getValue() * secondNeighbor.f1.getValue();
+			return new Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>(firstNeighbor.f0, firstNeighbor.f1,
+					new Vertex<Long, Long>(firstNeighbor.f0, sum));
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumAllNeighborsNoValue implements ReduceNeighborsFunction<Long, Long, Long> {
+
+		@Override
+		public Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> reduceNeighbors(Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> firstNeighbor,
+																				  Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> secondNeighbor) {
+			long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue();
+			return new Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>(firstNeighbor.f0, firstNeighbor.f1,
+					new Vertex<Long, Long>(firstNeighbor.f0, sum));
+		}
+	}
 
-		public Tuple2<Long, Long> iterateNeighbors(
-				Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
+	@SuppressWarnings("serial")
+	private static final class SumOutNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
 
 			long sum = 0;
 			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
@@ -260,17 +576,21 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 				next = neighborsIterator.next();
 				sum += next.f2.getValue();
 			}
-			return new Tuple2<Long, Long>(next.f0, sum);
+			if(next.f0 > 2) {
+				out.collect(new Tuple2<Long, Long>(next.f0, sum));
+				out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
+			}
 		}
 	}
 
 	@SuppressWarnings("serial")
-	private static final class SumInNeighborsNoValue implements NeighborsFunction<Long, Long, Long, 
-		Tuple2<Long, Long>> {
-		
-		public Tuple2<Long, Long> iterateNeighbors(
-				Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-		
+	private static final class SumInNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+
 			long sum = 0;
 			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
 			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
@@ -279,17 +599,21 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 				next = neighborsIterator.next();
 				sum += next.f2.getValue() * next.f1.getValue();
 			}
-			return new Tuple2<Long, Long>(next.f0, sum);
+			if(next.f0 > 2) {
+				out.collect(new Tuple2<Long, Long>(next.f0, sum));
+				out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
+			}
 		}
 	}
 
 	@SuppressWarnings("serial")
-	private static final class SumAllNeighborsNoValue implements NeighborsFunction<Long, Long, Long, 
-		Tuple2<Long, Long>> {
+	private static final class SumAllNeighborsNoValueMultipliedByTwoIdGreaterThanTwo implements NeighborsFunction<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
 
-		public Tuple2<Long, Long> iterateNeighbors(
-				Iterable<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighbors) {
-	
 			long sum = 0;
 			Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> next = null;
 			Iterator<Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>> neighborsIterator =
@@ -298,7 +622,65 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 				next = neighborsIterator.next();
 				sum += next.f2.getValue();
 			}
-			return new Tuple2<Long, Long>(next.f0, sum);
+			if(next.f0 > 2) {
+				out.collect(new Tuple2<Long, Long>(next.f0, sum));
+				out.collect(new Tuple2<Long, Long>(next.f0, sum * 2));
+			}
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumOutNeighborsMultipliedByTwo implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f1.getValue();
+			}
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum * 2));
 		}
 	}
-}
\ No newline at end of file
+
+	@SuppressWarnings("serial")
+	private static final class SumInNeighborsSubtractOne implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f0.getValue() * neighbor.f1.getValue();
+			}
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum));
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum - 1));
+		}
+	}
+
+	@SuppressWarnings("serial")
+	private static final class SumAllNeighborsAddFive implements NeighborsFunctionWithVertexValue<Long, Long, Long,
+			Tuple2<Long, Long>> {
+
+		@Override
+		public void iterateNeighbors(Vertex<Long, Long> vertex,
+									 Iterable<Tuple2<Edge<Long, Long>, Vertex<Long, Long>>> neighbors,
+									 Collector<Tuple2<Long, Long>> out) throws Exception {
+
+			long sum = 0;
+			for (Tuple2<Edge<Long, Long>, Vertex<Long, Long>> neighbor : neighbors) {
+				sum += neighbor.f1.getValue();
+			}
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue()));
+			out.collect(new Tuple2<Long, Long>(vertex.getId(), sum + vertex.getValue() + 5));
+		}
+	}
+}
+