You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/06/28 19:44:37 UTC

flink git commit: [FLINK-3277] [gelly] Use Value types in Gelly API

Repository: flink
Updated Branches:
  refs/heads/master 10898a90f -> 40749ddcd


[FLINK-3277] [gelly] Use Value types in Gelly API

This closes #1671


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/40749ddc
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/40749ddc
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/40749ddc

Branch: refs/heads/master
Commit: 40749ddcd73c4634d81c2153f64e8934d519be3d
Parents: 10898a9
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Feb 18 10:40:39 2016 -0500
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jun 28 15:04:16 2016 -0400

----------------------------------------------------------------------
 docs/apis/batch/libs/gelly.md                   | 24 +++----
 .../flink/graph/examples/GraphMetrics.java      | 15 +++--
 .../graph/scala/examples/GraphMetrics.scala     |  2 +-
 .../org/apache/flink/graph/scala/Graph.scala    | 22 +++---
 .../main/java/org/apache/flink/graph/Graph.java | 19 ++++--
 .../apache/flink/graph/library/GSAPageRank.java | 10 +--
 .../apache/flink/graph/library/PageRank.java    | 10 +--
 .../graph/spargel/ScatterGatherIteration.java   | 70 ++++++++++----------
 .../graph/generator/CompleteGraphTest.java      |  8 +--
 .../flink/graph/generator/CycleGraphTest.java   |  8 +--
 .../flink/graph/generator/EmptyGraphTest.java   |  4 +-
 .../flink/graph/generator/GridGraphTest.java    |  8 +--
 .../graph/generator/HypercubeGraphTest.java     |  8 +--
 .../flink/graph/generator/PathGraphTest.java    |  8 +--
 .../graph/generator/SingletonEdgeGraphTest.java |  8 +--
 .../flink/graph/generator/StarGraphTest.java    |  8 +--
 .../graph/test/operations/DegreesITCase.java    | 25 +++----
 .../operations/DegreesWithExceptionITCase.java  | 11 +--
 18 files changed, 138 insertions(+), 130 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index b4b78cc..d8692d6 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -346,13 +346,13 @@ DataSet<K> getVertexIds()
 DataSet<Tuple2<K, K>> getEdgeIds()
 
 // get a DataSet of <vertex ID, in-degree> pairs for all vertices
-DataSet<Tuple2<K, Long>> inDegrees()
+DataSet<Tuple2<K, LongValue>> inDegrees()
 
 // get a DataSet of <vertex ID, out-degree> pairs for all vertices
-DataSet<Tuple2<K, Long>> outDegrees()
+DataSet<Tuple2<K, LongValue>> outDegrees()
 
 // get a DataSet of <vertex ID, degree> pairs for all vertices, where degree is the sum of in- and out- degrees
-DataSet<Tuple2<K, Long>> getDegrees()
+DataSet<Tuple2<K, LongValue>> getDegrees()
 
 // get the number of vertices
 long numberOfVertices()
@@ -381,13 +381,13 @@ getVertexIds: DataSet[K]
 getEdgeIds: DataSet[(K, K)]
 
 // get a DataSet of <vertex ID, in-degree> pairs for all vertices
-inDegrees: DataSet[(K, Long)]
+inDegrees: DataSet[(K, LongValue)]
 
 // get a DataSet of <vertex ID, out-degree> pairs for all vertices
-outDegrees: DataSet[(K, Long)]
+outDegrees: DataSet[(K, LongValue)]
 
 // get a DataSet of <vertex ID, degree> pairs for all vertices, where degree is the sum of in- and out- degrees
-getDegrees: DataSet[(K, Long)]
+getDegrees: DataSet[(K, LongValue)]
 
 // get the number of vertices
 numberOfVertices: Long
@@ -519,13 +519,13 @@ Note that if the input dataset contains a key multiple times, all Gelly join met
 {% highlight java %}
 Graph<Long, Double, Double> network = ...
 
-DataSet<Tuple2<Long, Long>> vertexOutDegrees = network.outDegrees();
+DataSet<Tuple2<Long, LongValue>> vertexOutDegrees = network.outDegrees();
 
 // assign the transition probabilities as the edge weights
 Graph<Long, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees,
-				new VertexJoinFunction<Double, Long>() {
-					public Double vertexJoin(Double vertexValue, Long inputValue) {
-						return vertexValue / inputValue;
+				new VertexJoinFunction<Double, LongValue>() {
+					public Double vertexJoin(Double vertexValue, LongValue inputValue) {
+						return vertexValue / inputValue.getValue();
 					}
 				});
 {% endhighlight %}
@@ -535,10 +535,10 @@ Graph<Long, Double, Double> networkWithWeights = network.joinWithEdgesOnSource(v
 {% highlight scala %}
 val network: Graph[Long, Double, Double] = ...
 
-val vertexOutDegrees: DataSet[(Long, Long)] = network.outDegrees
+val vertexOutDegrees: DataSet[(Long, LongValue)] = network.outDegrees
 
 // assign the transition probabilities as the edge weights
-val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, (v1: Double, v2: Long) => v1 / v2)
+val networkWithWeights = network.joinWithEdgesOnSource(vertexOutDegrees, (v1: Double, v2: LongValue) => v1 / v2.getValue)
 {% endhighlight %}
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
index 9058538..e7b47bf 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/GraphMetrics.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.examples;
 
-import org.apache.flink.graph.examples.utils.ExampleUtils;
 import org.apache.flink.api.common.ProgramDescription;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
@@ -27,6 +26,8 @@ import org.apache.flink.api.java.aggregation.Aggregations;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.examples.utils.ExampleUtils;
+import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
 /**
@@ -66,7 +67,7 @@ public class GraphMetrics implements ProgramDescription {
 		long numEdges = graph.numberOfEdges();
 		
 		/** compute the average node degree **/
-		DataSet<Tuple2<Long, Long>> verticesWithDegrees = graph.getDegrees();
+		DataSet<Tuple2<Long, LongValue>> verticesWithDegrees = graph.getDegrees();
 
 		DataSet<Double> avgNodeDegree = verticesWithDegrees
 				.aggregate(Aggregations.SUM, 1).map(new AvgNodeDegreeMapper(numVertices));
@@ -96,7 +97,7 @@ public class GraphMetrics implements ProgramDescription {
 	}
 
 	@SuppressWarnings("serial")
-	private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, Long>, Double> {
+	private static final class AvgNodeDegreeMapper implements MapFunction<Tuple2<Long, LongValue>, Double> {
 
 		private long numberOfVertices;
 
@@ -104,14 +105,14 @@ public class GraphMetrics implements ProgramDescription {
 			this.numberOfVertices = numberOfVertices;
 		}
 
-		public Double map(Tuple2<Long, Long> sumTuple) {
-			return (double) (sumTuple.f1 / numberOfVertices) ;
+		public Double map(Tuple2<Long, LongValue> sumTuple) {
+			return (double) (sumTuple.f1.getValue() / numberOfVertices) ;
 		}
 	}
 
 	@SuppressWarnings("serial")
-	private static final class ProjectVertexId implements MapFunction<Tuple2<Long,Long>, Long> {
-		public Long map(Tuple2<Long, Long> value) { return value.f0; }
+	private static final class ProjectVertexId implements MapFunction<Tuple2<Long, LongValue>, Long> {
+		public Long map(Tuple2<Long, LongValue> value) { return value.f0; }
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
index f9fa82d..ebf43d4 100644
--- a/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
+++ b/flink-libraries/flink-gelly-examples/src/main/scala/org/apache/flink/graph/scala/examples/GraphMetrics.scala
@@ -61,7 +61,7 @@ object GraphMetrics {
 
     /** compute the average node degree **/
     val verticesWithDegrees = graph.getDegrees
-    val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2 / numVertices).toDouble)
+    val avgDegree = verticesWithDegrees.sum(1).map(in => (in._2.getValue / numVertices).toDouble)
 
     /** find the vertex with the maximum in-degree **/
     val maxInDegreeVertex = graph.inDegrees.max(1).map(in => in._1)

http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
index 165f6c2..f7e13ba 100644
--- a/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
+++ b/flink-libraries/flink-gelly-scala/src/main/scala/org/apache/flink/graph/scala/Graph.scala
@@ -18,24 +18,22 @@
 
 package org.apache.flink.graph.scala
 
-import org.apache.flink.util.Preconditions
 import org.apache.flink.api.common.functions.{FilterFunction, MapFunction}
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.{tuple => jtuple}
 import org.apache.flink.api.scala._
 import org.apache.flink.graph._
 import org.apache.flink.graph.asm.translate.TranslateFunction
-import org.apache.flink.graph.validation.GraphValidator
 import org.apache.flink.graph.gsa.{ApplyFunction, GSAConfiguration, GatherFunction, SumFunction}
+import org.apache.flink.graph.pregel.{ComputeFunction, MessageCombiner, VertexCentricConfiguration}
 import org.apache.flink.graph.spargel.{MessagingFunction, ScatterGatherConfiguration, VertexUpdateFunction}
+import org.apache.flink.graph.validation.GraphValidator
+import org.apache.flink.types.{LongValue, NullValue}
+import org.apache.flink.util.Preconditions
 import org.apache.flink.{graph => jg}
 
 import _root_.scala.collection.JavaConverters._
 import _root_.scala.reflect.ClassTag
-import org.apache.flink.types.NullValue
-import org.apache.flink.graph.pregel.ComputeFunction
-import org.apache.flink.graph.pregel.MessageCombiner
-import org.apache.flink.graph.pregel.VertexCentricConfiguration
 
 object Graph {
 
@@ -803,8 +801,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    *
    * @return A DataSet of Tuple2<vertexId, inDegree>
    */
-  def inDegrees(): DataSet[(K, Long)] = {
-    wrap(jgraph.inDegrees).map(javatuple => (javatuple.f0, javatuple.f1.longValue()))
+  def inDegrees(): DataSet[(K, LongValue)] = {
+    wrap(jgraph.inDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
   }
 
   /**
@@ -812,8 +810,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    *
    * @return A DataSet of Tuple2<vertexId, outDegree>
    */
-  def outDegrees(): DataSet[(K, Long)] = {
-    wrap(jgraph.outDegrees).map(javatuple => (javatuple.f0, javatuple.f1.longValue()))
+  def outDegrees(): DataSet[(K, LongValue)] = {
+    wrap(jgraph.outDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
   }
 
   /**
@@ -821,8 +819,8 @@ TypeInformation : ClassTag](jgraph: jg.Graph[K, VV, EV]) {
    *
    * @return A DataSet of Tuple2<vertexId, degree>
    */
-  def getDegrees(): DataSet[(K, Long)] = {
-    wrap(jgraph.getDegrees).map(javatuple => (javatuple.f0, javatuple.f1.longValue()))
+  def getDegrees(): DataSet[(K, LongValue)] = {
+    wrap(jgraph.getDegrees).map(javatuple => (javatuple.f0, javatuple.f1))
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
index 3dbb9c4..fe59283 100755
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/Graph.java
@@ -61,6 +61,7 @@ import org.apache.flink.graph.utils.Tuple2ToVertexMap;
 import org.apache.flink.graph.utils.Tuple3ToEdgeMap;
 import org.apache.flink.graph.utils.VertexToTuple2Map;
 import org.apache.flink.graph.validation.GraphValidator;
+import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 
@@ -867,25 +868,31 @@ public class Graph<K, VV, EV> {
 	 * 
 	 * @return A DataSet of {@code Tuple2<vertexId, outDegree>}
 	 */
-	public DataSet<Tuple2<K, Long>> outDegrees() {
+	public DataSet<Tuple2<K, LongValue>> outDegrees() {
 
 		return vertices.coGroup(edges).where(0).equalTo(0).with(new CountNeighborsCoGroup<K, VV, EV>());
 	}
 
 	private static final class CountNeighborsCoGroup<K, VV, EV>
-			implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, Long>> {
+			implements CoGroupFunction<Vertex<K, VV>, Edge<K, EV>, Tuple2<K, LongValue>> {
+		private LongValue degree = new LongValue();
+
+		private Tuple2<K, LongValue> vertexDegree = new Tuple2<>(null, degree);
+
 		@SuppressWarnings("unused")
 		public void coGroup(Iterable<Vertex<K, VV>> vertex,	Iterable<Edge<K, EV>> outEdges,
-				Collector<Tuple2<K, Long>> out) {
+				Collector<Tuple2<K, LongValue>> out) {
 			long count = 0;
 			for (Edge<K, EV> edge : outEdges) {
 				count++;
 			}
+			degree.setValue(count);
 
 			Iterator<Vertex<K, VV>> vertexIterator = vertex.iterator();
 
 			if(vertexIterator.hasNext()) {
-				out.collect(new Tuple2<K, Long>(vertexIterator.next().f0, count));
+				vertexDegree.f0 = vertexIterator.next().f0;
+				out.collect(vertexDegree);
 			} else {
 				throw new NoSuchElementException("The edge src/trg id could not be found within the vertexIds");
 			}
@@ -897,7 +904,7 @@ public class Graph<K, VV, EV> {
 	 * 
 	 * @return A DataSet of {@code Tuple2<vertexId, inDegree>}
 	 */
-	public DataSet<Tuple2<K, Long>> inDegrees() {
+	public DataSet<Tuple2<K, LongValue>> inDegrees() {
 
 		return vertices.coGroup(edges).where(0).equalTo(1).with(new CountNeighborsCoGroup<K, VV, EV>());
 	}
@@ -907,7 +914,7 @@ public class Graph<K, VV, EV> {
 	 * 
 	 * @return A DataSet of {@code Tuple2<vertexId, degree>}
 	 */
-	public DataSet<Tuple2<K, Long>> getDegrees() {
+	public DataSet<Tuple2<K, LongValue>> getDegrees() {
 		return outDegrees().union(inDegrees()).groupBy(0).sum(1);
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
index 324f9c3..ef39395 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
@@ -29,6 +29,7 @@ import org.apache.flink.graph.gsa.GSAConfiguration;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.Neighbor;
 import org.apache.flink.graph.gsa.SumFunction;
+import org.apache.flink.types.LongValue;
 
 /**
  * This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration.
@@ -56,8 +57,7 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet
 
 	@Override
 	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
-
-		DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
+		DataSet<Tuple2<K, LongValue>> vertexOutDegrees = network.outDegrees();
 
 		Graph<K, Double, Double> networkWithWeights = network
 				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
@@ -114,10 +114,10 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet
 	}
 
 	@SuppressWarnings("serial")
-	private static final class InitWeights implements EdgeJoinFunction<Double, Long> {
+	private static final class InitWeights implements EdgeJoinFunction<Double, LongValue> {
 
-		public Double edgeJoin(Double edgeValue, Long inputValue) {
-			return edgeValue / (double) inputValue;
+		public Double edgeJoin(Double edgeValue, LongValue inputValue) {
+			return edgeValue / (double) inputValue.getValue();
 		}
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index f83b05b..2f1b03b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -29,6 +29,7 @@ import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
 import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
+import org.apache.flink.types.LongValue;
 
 /**
  * This is an implementation of a simple PageRank algorithm, using a scatter-gather iteration.
@@ -56,8 +57,7 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
 
 	@Override
 	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
-
-		DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
+		DataSet<Tuple2<K, LongValue>> vertexOutDegrees = network.outDegrees();
 
 		Graph<K, Double, Double> networkWithWeights = network
 				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
@@ -118,10 +118,10 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
 	}
 
 	@SuppressWarnings("serial")
-	private static final class InitWeights implements EdgeJoinFunction<Double, Long> {
+	private static final class InitWeights implements EdgeJoinFunction<Double, LongValue> {
 
-		public Double edgeJoin(Double edgeValue, Long inputValue) {
-			return edgeValue / (double) inputValue;
+		public Double edgeJoin(Double edgeValue, LongValue inputValue) {
+			return edgeValue / (double) inputValue.getValue();
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
index 165ef1e..fc5c210 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
@@ -309,28 +309,28 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 	}
 
 	@SuppressWarnings("serial")
-	private static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> extends VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> {
+	private static final class VertexUpdateUdfVVWithDegrees<K, VV, Message> extends VertexUpdateUdf<K, Tuple3<VV, LongValue, LongValue>, Message> {
 
-		private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, Tuple3<VV, Long, Long>, Message> vertexUpdateFunction,
-				TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> resultType) {
+		private VertexUpdateUdfVVWithDegrees(VertexUpdateFunction<K, Tuple3<VV, LongValue, LongValue>, Message> vertexUpdateFunction,
+				TypeInformation<Vertex<K, Tuple3<VV, LongValue, LongValue>>> resultType) {
 			super(vertexUpdateFunction, resultType);
 		}
 		
 		@Override
-		public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> vertex,
-							Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
+		public void coGroup(Iterable<Tuple2<K, Message>> messages, Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> vertex,
+							Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception {
 
-			final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> vertexIter = vertex.iterator();
+			final Iterator<Vertex<K, Tuple3<VV, LongValue, LongValue>>> vertexIter = vertex.iterator();
 		
 			if (vertexIter.hasNext()) {
-				Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = vertexIter.next();
+				Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = vertexIter.next();
 		
 				@SuppressWarnings("unchecked")
 				Iterator<Tuple2<?, Message>> downcastIter = (Iterator<Tuple2<?, Message>>) (Iterator<?>) messages.iterator();
 				messageIter.setSource(downcastIter);
 
-				vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1);
-				vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2);
+				vertexUpdateFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
+				vertexUpdateFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
 
 				vertexUpdateFunction.setOutputWithDegrees(vertexWithDegrees, out);
 				vertexUpdateFunction.updateVertexFromScatterGatherIteration(vertexWithDegrees, messageIter);
@@ -420,7 +420,7 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
 	@SuppressWarnings("serial")
 	private static final class MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>
-		extends MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> {
+		extends MessagingUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> {
 
 		private Vertex<K, VV> nextVertex = new Vertex<K, VV>();
 
@@ -430,19 +430,19 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 		}
 
 		@Override
-		public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, Long, Long>>> state,
+		public void coGroup(Iterable<Edge<K, EV>> edges, Iterable<Vertex<K, Tuple3<VV, LongValue, LongValue>>> state,
 				Collector<Tuple2<K, Message>> out) throws Exception {
 
-			final Iterator<Vertex<K, Tuple3<VV, Long, Long>>> stateIter = state.iterator();
+			final Iterator<Vertex<K, Tuple3<VV, LongValue, LongValue>>> stateIter = state.iterator();
 		
 			if (stateIter.hasNext()) {
-				Vertex<K, Tuple3<VV, Long, Long>> vertexWithDegrees = stateIter.next();
+				Vertex<K, Tuple3<VV, LongValue, LongValue>> vertexWithDegrees = stateIter.next();
 
 				nextVertex.setField(vertexWithDegrees.f0, 0);
 				nextVertex.setField(vertexWithDegrees.f1.f0, 1);
 
-				messagingFunction.setInDegree(vertexWithDegrees.f1.f1);
-				messagingFunction.setOutDegree(vertexWithDegrees.f1.f2);
+				messagingFunction.setInDegree(vertexWithDegrees.f1.f1.getValue());
+				messagingFunction.setOutDegree(vertexWithDegrees.f1.f2.getValue());
 
 				messagingFunction.set((Iterator<?>) edges.iterator(), out, vertexWithDegrees.getId());
 				messagingFunction.sendMessages(nextVertex);
@@ -505,13 +505,13 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 	 * @return the messaging function
 	 */
 	private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunctionVerticesWithDegrees(
-			DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration,
+			DeltaIteration<Vertex<K, Tuple3<VV, LongValue, LongValue>>, Vertex<K, Tuple3<VV, LongValue, LongValue>>> iteration,
 			TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg,
 			DataSet<LongValue> numberOfVertices) {
 
 		// build the messaging function (co group)
 		CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
-		MessagingUdfWithEdgeValues<K, Tuple3<VV, Long, Long>, VV, Message, EV> messenger =
+		MessagingUdfWithEdgeValues<K, Tuple3<VV, LongValue, LongValue>, VV, Message, EV> messenger =
 				new MessagingUdfWithEVsVVWithDegrees<K, VV, Message, EV>(messagingFunction, messageTypeInfo);
 
 		messages = this.edgesWithValue.coGroup(iteration.getWorkset()).where(whereArg)
@@ -626,34 +626,34 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
 		this.updateFunction.setOptDegrees(this.configuration.isOptDegrees());
 
-		DataSet<Tuple2<K, Long>> inDegrees = graph.inDegrees();
-		DataSet<Tuple2<K, Long>> outDegrees = graph.outDegrees();
+		DataSet<Tuple2<K, LongValue>> inDegrees = graph.inDegrees();
+		DataSet<Tuple2<K, LongValue>> outDegrees = graph.outDegrees();
 
-		DataSet<Tuple3<K, Long, Long>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
-				.with(new FlatJoinFunction<Tuple2<K, Long>, Tuple2<K, Long>, Tuple3<K, Long, Long>>() {
+		DataSet<Tuple3<K, LongValue, LongValue>> degrees = inDegrees.join(outDegrees).where(0).equalTo(0)
+				.with(new FlatJoinFunction<Tuple2<K, LongValue>, Tuple2<K, LongValue>, Tuple3<K, LongValue, LongValue>>() {
 
 					@Override
-					public void join(Tuple2<K, Long> first, Tuple2<K, Long> second,	Collector<Tuple3<K, Long, Long>> out) {
-						out.collect(new Tuple3<K, Long, Long>(first.f0, first.f1, second.f1));
+					public void join(Tuple2<K, LongValue> first, Tuple2<K, LongValue> second, Collector<Tuple3<K, LongValue, LongValue>> out) {
+						out.collect(new Tuple3<K, LongValue, LongValue>(first.f0, first.f1, second.f1));
 					}
 				}).withForwardedFieldsFirst("f0;f1").withForwardedFieldsSecond("f1");
 
-		DataSet<Vertex<K, Tuple3<VV, Long, Long>>> verticesWithDegrees = initialVertices
+		DataSet<Vertex<K, Tuple3<VV, LongValue, LongValue>>> verticesWithDegrees = initialVertices
 				.join(degrees).where(0).equalTo(0)
-				.with(new FlatJoinFunction<Vertex<K,VV>, Tuple3<K,Long,Long>, Vertex<K, Tuple3<VV, Long, Long>>>() {
+				.with(new FlatJoinFunction<Vertex<K,VV>, Tuple3<K, LongValue, LongValue>, Vertex<K, Tuple3<VV, LongValue, LongValue>>>() {
 					@Override
-					public void join(Vertex<K, VV> vertex, Tuple3<K, Long, Long> degrees,
-									Collector<Vertex<K, Tuple3<VV, Long, Long>>> out) throws Exception {
+					public void join(Vertex<K, VV> vertex, Tuple3<K, LongValue, LongValue> degrees,
+									Collector<Vertex<K, Tuple3<VV, LongValue, LongValue>>> out) throws Exception {
 
-						out.collect(new Vertex<K, Tuple3<VV, Long, Long>>(vertex.getId(),
-								new Tuple3<VV, Long, Long>(vertex.getValue(), degrees.f1, degrees.f2)));
+						out.collect(new Vertex<K, Tuple3<VV, LongValue, LongValue>>(vertex.getId(),
+								new Tuple3<VV, LongValue, LongValue>(vertex.getValue(), degrees.f1, degrees.f2)));
 					}
 				}).withForwardedFieldsFirst("f0");
 
 		// add type info
-		TypeInformation<Vertex<K, Tuple3<VV, Long, Long>>> vertexTypes = verticesWithDegrees.getType();
+		TypeInformation<Vertex<K, Tuple3<VV, LongValue, LongValue>>> vertexTypes = verticesWithDegrees.getType();
 
-		final DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>,	Vertex<K, Tuple3<VV, Long, Long>>> iteration =
+		final DeltaIteration<Vertex<K, Tuple3<VV, LongValue, LongValue>>, Vertex<K, Tuple3<VV, LongValue, LongValue>>> iteration =
 				verticesWithDegrees.iterateDelta(verticesWithDegrees, this.maximumNumberOfIterations, 0);
 				setUpIteration(iteration);
 
@@ -673,11 +673,11 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 		}
 
 		@SuppressWarnings({ "unchecked", "rawtypes" })
-		VertexUpdateUdf<K, Tuple3<VV, Long, Long>, Message> updateUdf =
+		VertexUpdateUdf<K, Tuple3<VV, LongValue, LongValue>, Message> updateUdf =
 				new VertexUpdateUdfVVWithDegrees(updateFunction, vertexTypes);
 
 		// build the update function (co group)
-		CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, Long, Long>>> updates =
+		CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, LongValue, LongValue>>> updates =
 				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
 
 		if (this.configuration != null && this.configuration.isOptNumVertices()) {
@@ -687,9 +687,9 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 		configureUpdateFunction(updates);
 
 		return iteration.closeWith(updates, updates).map(
-				new MapFunction<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, VV>>() {
+				new MapFunction<Vertex<K, Tuple3<VV, LongValue, LongValue>>, Vertex<K, VV>>() {
 
-					public Vertex<K, VV> map(Vertex<K, Tuple3<VV, Long, Long>> vertex) {
+					public Vertex<K, VV> map(Vertex<K, Tuple3<VV, LongValue, LongValue>> vertex) {
 						return new Vertex<K, VV>(vertex.getId(), vertex.getValue().f0);
 					}
 				});

http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
index af47fdc..6c0e094 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CompleteGraphTest.java
@@ -56,10 +56,10 @@ extends AbstractGraphTest {
 		assertEquals(vertexCount, graph.numberOfVertices());
 		assertEquals(vertexCount*(vertexCount-1), graph.numberOfEdges());
 
-		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
-		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
-		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
-		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
+		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
 
 		assertEquals(vertexCount - 1, minInDegree);
 		assertEquals(vertexCount - 1, minOutDegree);

http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
index fb6799b..ec36aa7 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/CycleGraphTest.java
@@ -55,10 +55,10 @@ extends AbstractGraphTest {
 		assertEquals(vertexCount, graph.numberOfVertices());
 		assertEquals(2 * vertexCount, graph.numberOfEdges());
 
-		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
-		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
-		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
-		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
+		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
 
 		assertEquals(2, minInDegree);
 		assertEquals(2, minOutDegree);

http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
index bc1ef77..d4a524f 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/EmptyGraphTest.java
@@ -54,8 +54,8 @@ extends AbstractGraphTest {
 		assertEquals(vertexCount, graph.numberOfVertices());
 		assertEquals(0, graph.numberOfEdges());
 
-		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
-		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
 
 		assertEquals(0, maxInDegree);
 		assertEquals(0, maxOutDegree);

http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
index f3fa7db..9606d1a 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/GridGraphTest.java
@@ -63,10 +63,10 @@ extends AbstractGraphTest {
 		assertEquals(2*3*5*7, graph.numberOfVertices());
 		assertEquals(7 * 2*3*5*7, graph.numberOfEdges());
 
-		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
-		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
-		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
-		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
+		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
 
 		assertEquals(7, minInDegree);
 		assertEquals(7, minOutDegree);

http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
index 12024be..d723ecb 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/HypercubeGraphTest.java
@@ -57,10 +57,10 @@ extends AbstractGraphTest {
 		assertEquals(1L << dimensions, graph.numberOfVertices());
 		assertEquals(dimensions * (1L << dimensions), graph.numberOfEdges());
 
-		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
-		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
-		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
-		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
+		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
 
 		assertEquals(dimensions, minInDegree);
 		assertEquals(dimensions, minOutDegree);

http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
index b8a409f..3c3ce8c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/PathGraphTest.java
@@ -55,10 +55,10 @@ extends AbstractGraphTest {
 		assertEquals(vertexCount, graph.numberOfVertices());
 		assertEquals(2 * (vertexCount - 1), graph.numberOfEdges());
 
-		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
-		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
-		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
-		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
+		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
 
 		assertEquals(1, minInDegree);
 		assertEquals(1, minOutDegree);

http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
index 3877717..44a4d99 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/SingletonEdgeGraphTest.java
@@ -56,10 +56,10 @@ extends AbstractGraphTest {
 		assertEquals(2 * vertexPairCount, graph.numberOfVertices());
 		assertEquals(2 * vertexPairCount, graph.numberOfEdges());
 
-		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
-		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
-		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
-		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
+		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
 
 		assertEquals(1, minInDegree);
 		assertEquals(1, minOutDegree);

http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
index 2b090db..c656cfb 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/generator/StarGraphTest.java
@@ -57,10 +57,10 @@ extends AbstractGraphTest {
 		assertEquals(vertexCount, graph.numberOfVertices());
 		assertEquals(2 * (vertexCount - 1), graph.numberOfEdges());
 
-		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1;
-		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1;
-		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1;
-		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1;
+		long minInDegree = graph.inDegrees().min(1).collect().get(0).f1.getValue();
+		long minOutDegree = graph.outDegrees().min(1).collect().get(0).f1.getValue();
+		long maxInDegree = graph.inDegrees().max(1).collect().get(0).f1.getValue();
+		long maxOutDegree = graph.outDegrees().max(1).collect().get(0).f1.getValue();
 
 		assertEquals(1, minInDegree);
 		assertEquals(1, minOutDegree);

http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
index b2744f9..db2ca0d 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesITCase.java
@@ -25,6 +25,7 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.test.util.MultipleProgramsTestBase;
+import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -50,8 +51,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        DataSet<Tuple2<Long,Long>> data =graph.outDegrees();
-        List<Tuple2<Long,Long>> result= data.collect();
+        DataSet<Tuple2<Long, LongValue>> data = graph.outDegrees();
+        List<Tuple2<Long, LongValue>> result = data.collect();
        
         
         expectedResult = "1,2\n" +
@@ -76,8 +77,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
 
         
         
-        DataSet<Tuple2<Long,Long>> data =graph.outDegrees();
-        List<Tuple2<Long,Long>> result= data.collect();
+        DataSet<Tuple2<Long, LongValue>> data = graph.outDegrees();
+        List<Tuple2<Long, LongValue>> result = data.collect();
         
         expectedResult = "1,3\n" +
                 "2,1\n" +
@@ -99,8 +100,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
 	            TestGraphUtils.getLongLongEdgeData(env), env);
 
 
-        DataSet<Tuple2<Long,Long>> data =graph.inDegrees();
-        List<Tuple2<Long,Long>> result= data.collect();
+        DataSet<Tuple2<Long, LongValue>> data = graph.inDegrees();
+        List<Tuple2<Long, LongValue>> result = data.collect();
 	    
 	    expectedResult = "1,1\n" +
 		            "2,1\n" +
@@ -120,8 +121,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeDataWithZeroDegree(env), env);
 
-        DataSet<Tuple2<Long,Long>> data =graph.inDegrees();
-        List<Tuple2<Long,Long>> result= data.collect();
+        DataSet<Tuple2<Long, LongValue>> data = graph.inDegrees();
+        List<Tuple2<Long, LongValue>> result = data.collect();
         
         expectedResult = "1,0\n" +
 	                "2,1\n" +
@@ -142,8 +143,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
         Graph<Long, Long, Long> graph = Graph.fromDataSet(TestGraphUtils.getLongLongVertexData(env),
                 TestGraphUtils.getLongLongEdgeData(env), env);
 
-        DataSet<Tuple2<Long,Long>> data =graph.getDegrees();
-        List<Tuple2<Long,Long>> result= data.collect();
+        DataSet<Tuple2<Long, LongValue>> data = graph.getDegrees();
+        List<Tuple2<Long, LongValue>> result = data.collect();
         
         expectedResult = "1,3\n" +
 	                "2,2\n" +
@@ -164,8 +165,8 @@ public class DegreesITCase extends MultipleProgramsTestBase {
         Graph<Long, NullValue, Long> graph =
                 Graph.fromDataSet(TestGraphUtils.getDisconnectedLongLongEdgeData(env), env);
 
-        DataSet<Tuple2<Long,Long>> data =graph.outDegrees();
-        List<Tuple2<Long,Long>> result= data.collect();
+        DataSet<Tuple2<Long, LongValue>> data = graph.outDegrees();
+        List<Tuple2<Long, LongValue>> result = data.collect();
         
         expectedResult = "1,2\n" +
                 "2,1\n" +

http://git-wip-us.apache.org/repos/asf/flink/blob/40749ddc/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
index d79768f..551a97b 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/test/operations/DegreesWithExceptionITCase.java
@@ -28,6 +28,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.test.TestGraphUtils;
 import org.apache.flink.test.util.ForkableFlinkMiniCluster;
 
+import org.apache.flink.types.LongValue;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -81,7 +82,7 @@ public class DegreesWithExceptionITCase {
 				TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
 
 		try {
-			graph.outDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+			graph.outDegrees().output(new DiscardingOutputFormat<Tuple2<Long, LongValue>>());
 			env.execute();
 
 			fail("graph.outDegrees() did not fail.");
@@ -105,7 +106,7 @@ public class DegreesWithExceptionITCase {
 				TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
 
 		try {
-			graph.inDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+			graph.inDegrees().output(new DiscardingOutputFormat<Tuple2<Long, LongValue>>());
 			env.execute();
 
 			fail("graph.inDegrees() did not fail.");
@@ -129,7 +130,7 @@ public class DegreesWithExceptionITCase {
 				TestGraphUtils.getLongLongEdgeInvalidTrgData(env), env);
 
 		try {
-			graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+			graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, LongValue>>());
 			env.execute();
 
 			fail("graph.getDegrees() did not fail.");
@@ -153,7 +154,7 @@ public class DegreesWithExceptionITCase {
 				TestGraphUtils.getLongLongEdgeInvalidSrcData(env), env);
 
 		try {
-			graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+			graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, LongValue>>());
 			env.execute();
 
 			fail("graph.getDegrees() did not fail.");
@@ -177,7 +178,7 @@ public class DegreesWithExceptionITCase {
 				TestGraphUtils.getLongLongEdgeInvalidSrcTrgData(env), env);
 
 		try {
-			graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, Long>>());
+			graph.getDegrees().output(new DiscardingOutputFormat<Tuple2<Long, LongValue>>());
 			env.execute();
 
 			fail("graph.getDegrees() did not fail.");