You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by va...@apache.org on 2015/04/26 14:30:43 UTC

[2/5] flink git commit: [FLINK-1758][gelly] Made reduce methods operate on values

[FLINK-1758][gelly] Made reduce methods operate on values


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/ea9d9a0f
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/ea9d9a0f
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/ea9d9a0f

Branch: refs/heads/master
Commit: ea9d9a0f672e55f6947e2b04e6628eb0167ef1ec
Parents: d015bb0
Author: andralungu <lu...@gmail.com>
Authored: Mon Apr 20 23:09:36 2015 +0200
Committer: vasia <va...@apache.org>
Committed: Sun Apr 26 14:05:38 2015 +0200

----------------------------------------------------------------------
 docs/libs/gelly_guide.md                        |  11 +-
 .../main/java/org/apache/flink/graph/Graph.java | 142 +++++++++++--------
 .../apache/flink/graph/ReduceEdgesFunction.java |   2 +-
 .../flink/graph/ReduceNeighborsFunction.java    |   9 +-
 .../operations/ReduceOnEdgesMethodsITCase.java  |  24 ++--
 .../ReduceOnNeighborMethodsITCase.java          |  22 ++-
 6 files changed, 114 insertions(+), 96 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/ea9d9a0f/docs/libs/gelly_guide.md
----------------------------------------------------------------------
diff --git a/docs/libs/gelly_guide.md b/docs/libs/gelly_guide.md
index 6390d6f..6488029 100644
--- a/docs/libs/gelly_guide.md
+++ b/docs/libs/gelly_guide.md
@@ -320,14 +320,13 @@ DataSet<Tuple2<Long, Long>> verticesWithSum = graph.reduceOnNeighbors(
 				new SumValues(), EdgeDirection.IN);
 
 // user-defined function to sum the neighbor values
-static final class SumValues implements ReduceNeighborsFunction<Long, Long, Double> {
+static final class SumValues implements ReduceNeighborsFunction<Long, Long> {
 
-    public Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> reduceNeighbors(Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> firstNeighbor,
-    																		Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> secondNeighbor) {
+    public Tuple2<Long, Long> reduceNeighbors(Tuple2<Long, Long> firstNeighbor,
+    										  Tuple2<Long, Long> secondNeighbor) {
 
-    	long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue();
-    	return new Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>(firstNeighbor.f0, firstNeighbor.f1,
-    			new Vertex<Long, Long>(firstNeighbor.f0, sum));
+    	long sum = firstNeighbor.f1 + secondNeighbor.f1;
+    	return new Tuple2<Long, Long>(firstNeighbor.f0, sum));
     }
 }
 {% endhighlight %}

http://git-wip-us.apache.org/repos/asf/flink/blob/ea9d9a0f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 48d39b1..e21fd93 100755
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -787,6 +787,21 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
+	private static final class ProjectVertexWithEdgeValueMap<K extends Comparable<K> & Serializable, EV extends Serializable>
+			implements MapFunction<Edge<K, EV>, Tuple2<K, EV>> {
+
+		private int fieldPosition;
+
+		public ProjectVertexWithEdgeValueMap(int position) {
+			this.fieldPosition = position;
+		}
+
+		@SuppressWarnings("unchecked")
+		public Tuple2<K, EV> map(Edge<K, EV> edge) {
+			return new Tuple2<K, EV>((K) edge.getField(fieldPosition),	edge.getValue());
+		}
+	}
+
 	private static final class ApplyGroupReduceFunction<K extends Comparable<K> & Serializable, EV extends Serializable, T>
 			implements GroupReduceFunction<Tuple2<K, Edge<K, EV>>, T>,	ResultTypeQueryable<T> {
 
@@ -814,6 +829,14 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
+	private static final class EmitOneVertexWithEdgeValuePerNode<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+			implements FlatMapFunction<Edge<K, EV>, Tuple2<K, EV>> {
+		public void flatMap(Edge<K, EV> edge, Collector<Tuple2<K, EV>> out) {
+			out.collect(new Tuple2<K, EV>(edge.getSource(), edge.getValue()));
+			out.collect(new Tuple2<K, EV>(edge.getTarget(), edge.getValue()));
+		}
+	}
+
 	private static final class EmitOneEdgeWithNeighborPerNode<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
 			implements FlatMapFunction<Edge<K, EV>, Tuple3<K, K, Edge<K, EV>>> {
 		public void flatMap(Edge<K, EV> edge, Collector<Tuple3<K, K, Edge<K, EV>>> out) {
@@ -1332,27 +1355,50 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 	}
 
-	private static final class ProjectVertexIdJoin<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
-			implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
+	private static final class ProjectVertexWithNeighborValueJoin<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+			implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Tuple2<K, VV>> {
 
 		private int fieldPosition;
 
-		public ProjectVertexIdJoin(int position) {
+		public ProjectVertexWithNeighborValueJoin(int position) {
 			this.fieldPosition = position;
 		}
 
 		@SuppressWarnings("unchecked")
 		public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex, 
-				Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
+				Collector<Tuple2<K, VV>> out) {
+			out.collect(new Tuple2<K, VV>((K) edge.getField(fieldPosition), otherVertex.getValue()));
+		}
+	}
+
+	private static final class ProjectVertexIdJoin<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+			implements FlatJoinFunction<Edge<K, EV>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
+		private int fieldPosition;
+		public ProjectVertexIdJoin(int position) {
+			this.fieldPosition = position;
+		}
+		@SuppressWarnings("unchecked")
+		public void join(Edge<K, EV> edge, Vertex<K, VV> otherVertex,
+						Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
 			out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>((K) edge.getField(fieldPosition), edge, otherVertex));
 		}
 	}
 
+	private static final class ProjectNeighborValue<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
+			implements	FlatJoinFunction<Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple2<K, VV>> {
+		@SuppressWarnings("unchecked")
+		public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor,
+				Collector<Tuple2<K, VV>> out) {
+
+			out.collect(new Tuple2<K, VV>(keysWithEdge.f0, neighbor.getValue()));
+		}
+	}
+
 	private static final class ProjectEdgeWithNeighbor<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
 			implements	FlatJoinFunction<Tuple3<K, K, Edge<K, EV>>, Vertex<K, VV>, Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
+		@SuppressWarnings("unchecked")
 		public void join(Tuple3<K, K, Edge<K, EV>> keysWithEdge, Vertex<K, VV> neighbor,
-				Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
-
+						Collector<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> out) {
 			out.collect(new Tuple3<K, Edge<K, EV>, Vertex<K, VV>>(keysWithEdge.f0, keysWithEdge.f2, neighbor));
 		}
 	}
@@ -1433,65 +1479,53 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 *
 	 * @param reduceNeighborsFunction the function to apply to the neighborhood
 	 * @param direction the edge direction (in-, out-, all-)
-	 * @return a dataset containing one value per vertex
+	 * @return a dataset containing one value per vertex(vertex id, vertex value)
 	 * @throws IllegalArgumentException
 	 */
-	public DataSet reduceOnNeighbors(ReduceNeighborsFunction<K, VV, EV> reduceNeighborsFunction,
+	public DataSet<Tuple2<K, VV>> reduceOnNeighbors(ReduceNeighborsFunction<K, VV> reduceNeighborsFunction,
 									EdgeDirection direction) throws IllegalArgumentException {
 		switch (direction) {
 			case IN:
-				// create <edge-sourceVertex> pairs
-				final DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithSources = edges
+				// create <vertex-source value> pairs
+				final DataSet<Tuple2<K, VV>> verticesWithSourceNeighborValues = edges
 						.join(this.vertices).where(0).equalTo(0)
-						.with(new ProjectVertexIdJoin<K, VV, EV>(1));
-				return edgesWithSources.groupBy(0).reduce(new ApplyNeighborReduceFunction<K,VV,EV>(reduceNeighborsFunction))
-						.map(new ApplyNeighborhoodMapFunction<K, VV, EV>());
+						.with(new ProjectVertexWithNeighborValueJoin<K, VV, EV>(1));
+				return verticesWithSourceNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(reduceNeighborsFunction));
 			case OUT:
-				// create <edge-targetVertex> pairs
-				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithTargets = edges
+				// create <vertex-target value> pairs
+				DataSet<Tuple2<K, VV>> verticesWithTargetNeighborValues = edges
 						.join(this.vertices).where(1).equalTo(0)
-						.with(new ProjectVertexIdJoin<K, VV, EV>(0));
-				return edgesWithTargets.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV, EV>(reduceNeighborsFunction))
-						.map(new ApplyNeighborhoodMapFunction<K, VV, EV>());
+						.with(new ProjectVertexWithNeighborValueJoin<K, VV, EV>(0));
+				return verticesWithTargetNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(reduceNeighborsFunction));
 			case ALL:
-				// create <edge-sourceOrTargetVertex> pairs
-				DataSet<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> edgesWithNeighbors = edges
+				// create <vertex-neighbor value> pairs
+				DataSet<Tuple2<K, VV>> verticesWithNeighborValues = edges
 						.flatMap(new EmitOneEdgeWithNeighborPerNode<K, VV, EV>())
 						.join(this.vertices).where(1).equalTo(0)
-						.with(new ProjectEdgeWithNeighbor<K, VV, EV>());
+						.with(new ProjectNeighborValue<K, VV, EV>());
 
-				return edgesWithNeighbors.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV, EV>(reduceNeighborsFunction))
-						.map(new ApplyNeighborhoodMapFunction<K, VV, EV>());
+				return verticesWithNeighborValues.groupBy(0).reduce(new ApplyNeighborReduceFunction<K, VV>(reduceNeighborsFunction));
 			default:
 				throw new IllegalArgumentException("Illegal edge direction");
 		}
 	}
 
-	private static final class ApplyNeighborReduceFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
-			implements ReduceFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>>> {
+	private static final class ApplyNeighborReduceFunction<K extends Comparable<K> & Serializable, VV extends Serializable>
+			implements ReduceFunction<Tuple2<K, VV>> {
 
-		private ReduceNeighborsFunction<K, VV, EV> function;
+		private ReduceNeighborsFunction<K, VV> function;
 
-		public ApplyNeighborReduceFunction(ReduceNeighborsFunction<K, VV, EV> fun) {
+		public ApplyNeighborReduceFunction(ReduceNeighborsFunction<K, VV> fun) {
 			this.function = fun;
 		}
 
 		@Override
-		public Tuple3<K, Edge<K, EV>, Vertex<K, VV>> reduce(Tuple3<K, Edge<K, EV>, Vertex<K, VV>> first,
-															Tuple3<K, Edge<K, EV>, Vertex<K, VV>> second) throws Exception {
+		public Tuple2<K, VV> reduce(Tuple2<K, VV> first,
+									Tuple2<K, VV> second) throws Exception {
 			return function.reduceNeighbors(first, second);
 		}
 	}
 
-	public static final class ApplyNeighborhoodMapFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
-			implements MapFunction<Tuple3<K, Edge<K, EV>, Vertex<K, VV>> ,Tuple2<K, VV>> {
-
-		@Override
-		public Tuple2<K, VV> map(Tuple3<K, Edge<K, EV>, Vertex<K, VV>> edgesWithSrc) throws Exception {
-			return new Tuple2<K, VV>(edgesWithSrc.f0, edgesWithSrc.f2.getValue());
-		}
-	}
-
 	/**
 	 * Compute an aggregate over the edges of each vertex. The function applied
 	 * on the edges only has access to the vertex id (not the vertex value).
@@ -1500,32 +1534,29 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 	 *            the function to apply to the neighborhood
 	 * @param direction
 	 *            the edge direction (in-, out-, all-)
-	 * @return a dataset containing one value per vertex
+	 * @return a dataset containing one value per vertex(vertex key, edge value)
 	 * @throws IllegalArgumentException
 	 */
-	public DataSet reduceOnEdges(ReduceEdgesFunction<K, EV> reduceEdgesFunction,
+	public DataSet<Tuple2<K, EV>> reduceOnEdges(ReduceEdgesFunction<K, EV> reduceEdgesFunction,
 								EdgeDirection direction) throws IllegalArgumentException {
 
 		switch (direction) {
 			case IN:
-				return edges.map(new ProjectVertexIdMap<K, EV>(1))
-						.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
-						.map(new ApplyEdgesMapFunction());
+				return edges.map(new ProjectVertexWithEdgeValueMap<K, EV>(1))
+						.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
 			case OUT:
-				return edges.map(new ProjectVertexIdMap<K, EV>(0))
-						.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
-						.map(new ApplyEdgesMapFunction());
+				return edges.map(new ProjectVertexWithEdgeValueMap<K, EV>(0))
+						.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
 			case ALL:
-				return edges.flatMap(new EmitOneEdgePerNode<K, VV, EV>())
-						.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction))
-						.map(new ApplyEdgesMapFunction());
+				return edges.flatMap(new EmitOneVertexWithEdgeValuePerNode<K, VV, EV>())
+						.groupBy(0).reduce(new ApplyReduceFunction<K, EV>(reduceEdgesFunction));
 			default:
 				throw new IllegalArgumentException("Illegal edge direction");
 		}
 	}
 
 	private static final class ApplyReduceFunction<K extends Comparable<K> & Serializable, EV extends Serializable>
-			implements ReduceFunction<Tuple2<K, Edge<K, EV>>> {
+			implements ReduceFunction<Tuple2<K, EV>> {
 
 		private ReduceEdgesFunction<K, EV> function;
 
@@ -1534,17 +1565,8 @@ public class Graph<K extends Comparable<K> & Serializable, VV extends Serializab
 		}
 
 		@Override
-		public Tuple2<K, Edge<K, EV>> reduce(Tuple2<K, Edge<K, EV>> first, Tuple2<K, Edge<K, EV>> second) throws Exception {
+		public Tuple2<K, EV> reduce(Tuple2<K, EV> first, Tuple2<K, EV> second) throws Exception {
 			return function.reduceEdges(first, second);
 		}
 	}
-
-	public static final class ApplyEdgesMapFunction<K extends Comparable<K> & Serializable, VV extends Serializable, EV extends Serializable>
-			implements MapFunction<Tuple2<K, Edge<K, EV>> ,Tuple2<K, EV>> {
-
-		@Override
-		public Tuple2<K, EV> map(Tuple2<K, Edge<K, EV>> edge) throws Exception {
-			return new Tuple2<K, EV>(edge.f0, edge.f1.getValue());
-		}
-	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ea9d9a0f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
index 53c7934..a411aec 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceEdgesFunction.java
@@ -33,5 +33,5 @@ import java.io.Serializable;
 public interface ReduceEdgesFunction<K extends Comparable<K> & Serializable,
 		EV extends Serializable> extends Function, Serializable {
 
-	Tuple2<K, Edge<K, EV>> reduceEdges(Tuple2<K, Edge<K, EV>> firstEdge, Tuple2<K, Edge<K, EV>> secondEdge);
+	Tuple2<K, EV> reduceEdges(Tuple2<K, EV> firstEdge, Tuple2<K, EV> secondEdge);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ea9d9a0f/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
index f5e978f..8115a5d 100644
--- a/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
+++ b/flink-staging/flink-gelly/src/main/java/org/apache/flink/graph/ReduceNeighborsFunction.java
@@ -19,7 +19,7 @@
 package org.apache.flink.graph;
 
 import org.apache.flink.api.common.functions.Function;
-import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.tuple.Tuple2;
 
 import java.io.Serializable;
 
@@ -32,9 +32,8 @@ import java.io.Serializable;
  * @param <VV> the vertex value type
  * @param <EV> the edge value type
  */
-public interface ReduceNeighborsFunction <K extends Comparable<K> & Serializable, VV extends Serializable,
-		EV extends Serializable> extends Function, Serializable {
+public interface ReduceNeighborsFunction <K extends Comparable<K> & Serializable, VV extends Serializable> extends Function, Serializable {
 
-	Tuple3<K, Edge<K, EV>, Vertex<K, VV>> reduceNeighbors(Tuple3<K, Edge<K, EV>, Vertex<K, VV>> firstNeighbor,
-														Tuple3<K, Edge<K, EV>, Vertex<K, VV>> secondNeighbor);
+	Tuple2<K, VV> reduceNeighbors(Tuple2<K, VV> firstNeighbor,
+								Tuple2<K, VV> secondNeighbor);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/ea9d9a0f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
index 3ace49a..8a0e258 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnEdgesMethodsITCase.java
@@ -440,10 +440,10 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 	private static final class SelectMinWeightNeighborNoValue implements ReduceEdgesFunction<Long, Long> {
 
 		@Override
-		public Tuple2<Long, Edge<Long, Long>> reduceEdges(Tuple2<Long, Edge<Long, Long>> firstEdge,
-														  Tuple2<Long, Edge<Long, Long>> secondEdge) {
+		public Tuple2<Long, Long> reduceEdges(Tuple2<Long, Long> firstEdge,
+											  Tuple2<Long, Long> secondEdge) {
 
-			if(firstEdge.f1.getValue() < secondEdge.f1.getValue()) {
+			if(firstEdge.f1 < secondEdge.f1) {
 				return firstEdge;
 			} else {
 				return secondEdge;
@@ -456,9 +456,9 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 	private static final class SelectMaxWeightNeighborNoValue implements ReduceEdgesFunction<Long, Long> {
 
 		@Override
-		public Tuple2<Long, Edge<Long, Long>> reduceEdges(Tuple2<Long, Edge<Long, Long>> firstEdge,
-														  Tuple2<Long, Edge<Long, Long>> secondEdge) {
-			if(firstEdge.f1.getValue() > secondEdge.f1.getValue()) {
+		public Tuple2<Long, Long> reduceEdges(Tuple2<Long, Long> firstEdge,
+											  Tuple2<Long, Long> secondEdge) {
+			if(firstEdge.f1 > secondEdge.f1) {
 				return firstEdge;
 			} else {
 				return secondEdge;
@@ -474,15 +474,15 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 				Iterable<Edge<Long, Long>> edges, Collector<Tuple2<Long, Long>> out) throws Exception {
 			
 			long weight = Long.MAX_VALUE;
-			long minNeighorId = 0;
+			long minNeighborId = 0;
 			
 			for (Edge<Long, Long> edge: edges) {
 				if (edge.getValue() < weight) {
 					weight = edge.getValue();
-					minNeighorId = edge.getSource();
+					minNeighborId = edge.getSource();
 				}
 			}
-			out.collect(new Tuple2<Long, Long>(v.getId(), minNeighorId));
+			out.collect(new Tuple2<Long, Long>(v.getId(), minNeighborId));
 		}
 	}
 
@@ -490,9 +490,9 @@ public class ReduceOnEdgesMethodsITCase extends MultipleProgramsTestBase {
 	private static final class SelectMinWeightInNeighborNoValue implements ReduceEdgesFunction<Long, Long> {
 
 		@Override
-		public Tuple2<Long, Edge<Long, Long>> reduceEdges(Tuple2<Long, Edge<Long, Long>> firstEdge,
-														  Tuple2<Long, Edge<Long, Long>> secondEdge) {
-			if(firstEdge.f1.getValue() < secondEdge.f1.getValue()) {
+		public Tuple2<Long, Long> reduceEdges(Tuple2<Long, Long> firstEdge,
+											  Tuple2<Long, Long> secondEdge) {
+			if(firstEdge.f1 < secondEdge.f1) {
 				return firstEdge;
 			} else {
 				return secondEdge;

http://git-wip-us.apache.org/repos/asf/flink/blob/ea9d9a0f/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
----------------------------------------------------------------------
diff --git a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
index 5f23569..62d6dc9 100644
--- a/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
+++ b/flink-staging/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/ReduceOnNeighborMethodsITCase.java
@@ -523,14 +523,13 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	private static final class SumOutNeighborsNoValue implements ReduceNeighborsFunction<Long, Long, Long> {
+	private static final class SumOutNeighborsNoValue implements ReduceNeighborsFunction<Long, Long> {
 
 		@Override
-		public Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> reduceNeighbors(Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> firstNeighbor,
-																				  Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> secondNeighbor) {
-			long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue();
-			return new Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>(firstNeighbor.f0, firstNeighbor.f1,
-					new Vertex<Long, Long>(firstNeighbor.f0, sum));
+		public Tuple2<Long, Long> reduceNeighbors(Tuple2<Long, Long> firstNeighbor,
+												  Tuple2<Long, Long> secondNeighbor) {
+			long sum = firstNeighbor.f1 + secondNeighbor.f1;
+			return new Tuple2<Long, Long>(firstNeighbor.f0, sum);
 		}
 	}
 
@@ -554,14 +553,13 @@ public class ReduceOnNeighborMethodsITCase extends MultipleProgramsTestBase {
 	}
 
 	@SuppressWarnings("serial")
-	private static final class SumAllNeighborsNoValue implements ReduceNeighborsFunction<Long, Long, Long> {
+	private static final class SumAllNeighborsNoValue implements ReduceNeighborsFunction<Long, Long> {
 
 		@Override
-		public Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> reduceNeighbors(Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> firstNeighbor,
-																				  Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>> secondNeighbor) {
-			long sum = firstNeighbor.f2.getValue() + secondNeighbor.f2.getValue();
-			return new Tuple3<Long, Edge<Long, Long>, Vertex<Long, Long>>(firstNeighbor.f0, firstNeighbor.f1,
-					new Vertex<Long, Long>(firstNeighbor.f0, sum));
+		public Tuple2<Long, Long> reduceNeighbors(Tuple2<Long, Long> firstNeighbor,
+												  Tuple2<Long, Long> secondNeighbor) {
+			long sum = firstNeighbor.f1 + secondNeighbor.f1;
+			return new Tuple2<Long, Long>(firstNeighbor.f0, sum);
 		}
 	}