You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/09/15 13:58:34 UTC

[3/4] flink git commit: [FLINK-7273] [gelly] Gelly tests with empty graphs

[FLINK-7273] [gelly] Gelly tests with empty graphs

There exist some tests with empty graphs but the `EmptyGraph` in
`AsmTestBase` contained vertices but no edges. Add a new `EmptyGraph`
without vertices and test both empty graphs for each algorithm.

EmptyGraph now generates the proper TypeInformation (for Edge<> not
Tuple3) which had prevented adding edges due to a union incompatibility.

GraphGeneratorUtils#vertexSet now uses a hash-combine for distinct.

`PageRank` optionally includes zero-degree vertices in the results
(at a performance cost).

This closes #4405


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9437a0ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9437a0ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9437a0ff

Branch: refs/heads/master
Commit: 9437a0ffc04318f6a1a2d19c59f2ae6651b26507
Parents: 6612c0e
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Jul 26 06:23:52 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Sep 14 23:25:25 2017 -0400

----------------------------------------------------------------------
 docs/dev/libs/gelly/library_methods.md          |   1 +
 .../apache/flink/graph/drivers/PageRank.java    |   6 +-
 .../flink/graph/drivers/input/GridGraph.java    |   1 -
 .../flink/graph/generator/CirculantGraph.java   |   2 +-
 .../flink/graph/generator/CompleteGraph.java    |   2 +-
 .../flink/graph/generator/CycleGraph.java       |   2 +-
 .../apache/flink/graph/generator/EchoGraph.java |   4 +-
 .../flink/graph/generator/EmptyGraph.java       |  14 ++-
 .../graph/generator/GraphGeneratorUtils.java    |  46 ++++++---
 .../apache/flink/graph/generator/GridGraph.java |   2 +-
 .../flink/graph/generator/HypercubeGraph.java   |   4 +-
 .../apache/flink/graph/generator/PathGraph.java |   2 +-
 .../graph/generator/SingletonEdgeGraph.java     |   6 +-
 .../apache/flink/graph/generator/StarGraph.java |   8 +-
 .../directed/AverageClusteringCoefficient.java  |   5 +
 .../directed/GlobalClusteringCoefficient.java   |   5 +
 .../clustering/directed/TriadicCensus.java      |   5 +
 .../AverageClusteringCoefficient.java           |   5 +
 .../undirected/GlobalClusteringCoefficient.java |   5 +
 .../clustering/undirected/TriadicCensus.java    |   5 +
 .../flink/graph/library/linkanalysis/HITS.java  |  12 ++-
 .../graph/library/linkanalysis/PageRank.java    |  50 ++++++++-
 .../library/metric/directed/EdgeMetrics.java    |   5 +
 .../library/metric/directed/VertexMetrics.java  |   5 +
 .../library/metric/undirected/EdgeMetrics.java  |   5 +
 .../metric/undirected/VertexMetrics.java        |   5 +
 .../graph/library/similarity/AdamicAdar.java    |   4 +-
 .../org/apache/flink/graph/asm/AsmTestBase.java |  25 ++++-
 .../graph/asm/dataset/ChecksumHashCodeTest.java |  19 +++-
 .../flink/graph/asm/dataset/CollectTest.java    |  19 +++-
 .../flink/graph/asm/dataset/CountTest.java      |  18 +++-
 .../annotate/directed/EdgeDegreesPairTest.java  |  25 +++--
 .../directed/EdgeSourceDegreesTest.java         |  25 +++--
 .../directed/EdgeTargetDegreesTest.java         |  25 +++--
 .../annotate/directed/VertexDegreesTest.java    |  40 ++++---
 .../annotate/directed/VertexInDegreeTest.java   |  52 +++++++---
 .../annotate/directed/VertexOutDegreeTest.java  |  52 +++++++---
 .../annotate/undirected/EdgeDegreePairTest.java |  45 ++++++--
 .../undirected/EdgeSourceDegreeTest.java        |  45 ++++++--
 .../undirected/EdgeTargetDegreeTest.java        |  45 ++++++--
 .../annotate/undirected/VertexDegreeTest.java   |  45 +++++---
 .../filter/undirected/MaximumDegreeTest.java    |  28 +++--
 .../graph/asm/simple/directed/SimplifyTest.java |   3 +-
 .../asm/simple/undirected/SimplifyTest.java     |   6 +-
 .../graph/asm/translate/TranslateTest.java      |   9 +-
 .../graph/generator/CirculantGraphTest.java     |  12 +--
 .../graph/generator/CompleteGraphTest.java      |  12 +--
 .../flink/graph/generator/CycleGraphTest.java   |  12 +--
 .../flink/graph/generator/EchoGraphTest.java    |  24 ++---
 .../flink/graph/generator/EmptyGraphTest.java   |  12 +--
 .../flink/graph/generator/GridGraphTest.java    |  12 +--
 .../graph/generator/HypercubeGraphTest.java     |  12 +--
 .../flink/graph/generator/PathGraphTest.java    |  12 +--
 .../flink/graph/generator/RMatGraphTest.java    |   9 +-
 .../graph/generator/SingletonEdgeGraphTest.java |  12 +--
 .../flink/graph/generator/StarGraphTest.java    |  12 +--
 .../apache/flink/graph/generator/TestUtils.java |   9 +-
 .../AverageClusteringCoefficientTest.java       |  74 ++++++-------
 .../GlobalClusteringCoefficientTest.java        |  73 ++++++-------
 .../LocalClusteringCoefficientTest.java         |  60 +++++++----
 .../clustering/directed/TriadicCensusTest.java  |  34 +++---
 .../directed/TriangleListingTest.java           |  28 +++--
 .../AverageClusteringCoefficientTest.java       |  74 ++++++-------
 .../GlobalClusteringCoefficientTest.java        |  73 ++++++-------
 .../LocalClusteringCoefficientTest.java         |  60 +++++++----
 .../undirected/TriadicCensusTest.java           |  28 +++--
 .../undirected/TriangleListingTest.java         |  31 ++++--
 .../graph/library/linkanalysis/HITSTest.java    | 103 +++++++++++--------
 .../library/linkanalysis/PageRankTest.java      |  97 ++++++++++-------
 .../library/metric/ChecksumHashCodeTest.java    |  27 ++++-
 .../metric/directed/EdgeMetricsTest.java        |  70 ++++++-------
 .../metric/directed/VertexMetricsTest.java      |  95 ++++++++---------
 .../metric/undirected/EdgeMetricsTest.java      |  70 ++++++-------
 .../metric/undirected/VertexMetricsTest.java    |  95 ++++++++---------
 .../library/similarity/AdamicAdarTest.java      |  76 +++++++++++---
 .../library/similarity/JaccardIndexTest.java    |  89 ++++++++++++----
 76 files changed, 1320 insertions(+), 829 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/docs/dev/libs/gelly/library_methods.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/library_methods.md b/docs/dev/libs/gelly/library_methods.md
index de53aaf..93a2c5d 100644
--- a/docs/dev/libs/gelly/library_methods.md
+++ b/docs/dev/libs/gelly/library_methods.md
@@ -330,6 +330,7 @@ The algorithm takes a simple directed graph as input and outputs a `DataSet` of
 hub score, and authority score. Termination is configured by the number of iterations and/or a convergence threshold on
 the iteration sum of the change in scores over all vertices.
 
+* `setIncludeZeroDegreeVertices`: whether to include zero-degree vertices in the iterative computation
 * `setParallelism`: override the operator parallelism
 
 ### PageRank

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
index 299aeed..b9997e4 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
@@ -20,13 +20,14 @@ package org.apache.flink.graph.drivers;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.BooleanParameter;
 import org.apache.flink.graph.drivers.parameter.DoubleParameter;
 import org.apache.flink.graph.drivers.parameter.IterationConvergence;
 
 import org.apache.commons.lang3.text.StrBuilder;
 
 /**
- * @see org.apache.flink.graph.library.linkanalysis.PageRank
+ * Driver for {@link org.apache.flink.graph.library.linkanalysis.PageRank}.
  */
 public class PageRank<K, VV, EV>
 extends DriverBase<K, VV, EV> {
@@ -40,6 +41,8 @@ extends DriverBase<K, VV, EV> {
 
 	private IterationConvergence iterationConvergence = new IterationConvergence(this, DEFAULT_ITERATIONS);
 
+	private BooleanParameter includeZeroDegreeVertices = new BooleanParameter(this, "__include_zero_degree_vertices");
+
 	@Override
 	public String getShortDescription() {
 		return "score vertices by the number and quality of incoming links";
@@ -63,6 +66,7 @@ extends DriverBase<K, VV, EV> {
 					dampingFactor.getValue(),
 					iterationConvergence.getValue().iterations,
 					iterationConvergence.getValue().convergenceThreshold)
+				.setIncludeZeroDegreeVertices(includeZeroDegreeVertices.getValue())
 				.setParallelism(parallelism.getValue().intValue()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
index 1421b1d..fdb144f 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
@@ -76,7 +76,6 @@ extends GeneratedGraph {
 
 	@Override
 	protected long vertexCount() {
-		// in Java 8 use Math.multiplyExact(long, long)
 		BigInteger vertexCount = BigInteger.ONE;
 		for (Dimension dimension : dimensions) {
 			vertexCount = vertexCount.multiply(BigInteger.valueOf(dimension.size));

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
index 1dc9e66..0cb3edd 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
@@ -51,7 +51,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 	private final ExecutionEnvironment env;
 
 	// Required configuration
-	private long vertexCount;
+	private final long vertexCount;
 
 	private List<OffsetRange> offsetRanges = new ArrayList<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
index bf7dedf..46d8e67 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
@@ -36,7 +36,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 	private final ExecutionEnvironment env;
 
 	// Required configuration
-	private long vertexCount;
+	private final long vertexCount;
 
 	/**
 	 * An undirected {@link Graph} connecting every distinct pair of vertices.

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
index 5b61fa8..5dad4c8 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
@@ -36,7 +36,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 	private final ExecutionEnvironment env;
 
 	// Required configuration
-	private long vertexCount;
+	private final long vertexCount;
 
 	/**
 	 * An undirected {@link Graph} with {@code n} vertices where each vertex

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
index d834df1..4baeb25 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
@@ -46,9 +46,9 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 	private final ExecutionEnvironment env;
 
 	// Required configuration
-	private long vertexCount;
+	private final long vertexCount;
 
-	private long vertexDegree;
+	private final long vertexDegree;
 
 	/**
 	 * An undirected {@link Graph} whose vertices have the same degree.

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
index 466e2d3..30d0d2f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
@@ -18,12 +18,11 @@
 
 package org.apache.flink.graph.generator;
 
+import org.apache.flink.api.common.typeinfo.TypeHint;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.ValueTypeInfo;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
@@ -39,13 +38,13 @@ import java.util.Collections;
 public class EmptyGraph
 extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 
-	public static final int MINIMUM_VERTEX_COUNT = 1;
+	public static final int MINIMUM_VERTEX_COUNT = 0;
 
 	// Required to create the DataSource
 	private final ExecutionEnvironment env;
 
 	// Required configuration
-	private long vertexCount;
+	private final long vertexCount;
 
 	/**
 	 * The {@link Graph} containing no edges.
@@ -63,15 +62,14 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate() {
+		Preconditions.checkState(vertexCount >= 0);
+
 		// Vertices
 		DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
 
 		// Edges
-		TypeInformation<Edge<LongValue, NullValue>> typeInformation = new TupleTypeInfo<>(
-			ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.NULL_VALUE_TYPE_INFO);
-
 		DataSource<Edge<LongValue, NullValue>> edges = env
-			.fromCollection(Collections.<Edge<LongValue, NullValue>>emptyList(), typeInformation)
+			.fromCollection(Collections.<Edge<LongValue, NullValue>>emptyList(), TypeInformation.of(new TypeHint<Edge<LongValue, NullValue>>(){}))
 				.setParallelism(parallelism)
 				.name("Empty edge set");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
index d5a70f3..61a27c4 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
@@ -20,6 +20,9 @@ package org.apache.flink.graph.generator;
 
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
@@ -31,6 +34,9 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.LongValueSequenceIterator;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
 
 /**
  * Utilities for graph generators.
@@ -45,26 +51,34 @@ public class GraphGeneratorUtils {
 	 * @param env the Flink execution environment.
 	 * @param parallelism operator parallelism
 	 * @param vertexCount number of sequential vertex labels
-	 * @return {@link DataSet} of sequentially labeled {@link Vertex Vertices}
+	 * @return {@link DataSet} of sequentially labeled {@link Vertex vertices}
 	 */
 	public static DataSet<Vertex<LongValue, NullValue>> vertexSequence(ExecutionEnvironment env, int parallelism, long vertexCount) {
-		LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount - 1);
-
-		DataSource<LongValue> vertexLabels = env
-			.fromParallelCollection(iterator, LongValue.class)
-				.setParallelism(parallelism)
-				.name("Vertex iterators");
-
-		return vertexLabels
-			.map(new CreateVertex())
-				.setParallelism(parallelism)
-				.name("Vertex sequence");
+		Preconditions.checkArgument(vertexCount >= 0, "Vertex count must be non-negative");
+
+		if (vertexCount == 0) {
+			return env
+				.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Vertex<LongValue, NullValue>>(){}))
+					.setParallelism(parallelism)
+					.name("Empty vertex set");
+		} else {
+			LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount - 1);
+
+			DataSource<LongValue> vertexLabels = env
+				.fromParallelCollection(iterator, LongValue.class)
+					.setParallelism(parallelism)
+					.name("Vertex indices");
+
+			return vertexLabels
+				.map(new CreateVertex())
+					.setParallelism(parallelism)
+					.name("Vertex sequence");
+		}
 	}
 
 	@ForwardedFields("*->f0")
 	private static class CreateVertex
 	implements MapFunction<LongValue, Vertex<LongValue, NullValue>> {
-
 		private Vertex<LongValue, NullValue> vertex = new Vertex<>(null, NullValue.getInstance());
 
 		@Override
@@ -79,13 +93,13 @@ public class GraphGeneratorUtils {
 	// --------------------------------------------------------------------------------------------
 
 	/**
-	 * Generates {@link Vertex Vertices} present in the given set of {@link Edge}s.
+	 * Generates {@link Vertex vertices} present in the given set of {@link Edge}s.
 	 *
 	 * @param edges source {@link DataSet} of {@link Edge}s
 	 * @param parallelism operator parallelism
 	 * @param <K> label type
 	 * @param <EV> edge value type
-	 * @return {@link DataSet} of discovered {@link Vertex Vertices}
+	 * @return {@link DataSet} of discovered {@link Vertex vertices}
 	 *
 	 * @see Graph#fromDataSet(DataSet, DataSet, ExecutionEnvironment)
 	 */
@@ -97,6 +111,7 @@ public class GraphGeneratorUtils {
 
 		return vertexSet
 			.distinct()
+			.setCombineHint(CombineHint.HASH)
 				.setParallelism(parallelism)
 				.name("Emit vertex labels");
 	}
@@ -106,7 +121,6 @@ public class GraphGeneratorUtils {
 	 */
 	private static final class EmitSrcAndTarget<K, EV>
 	implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> {
-
 		private Vertex<K, NullValue> output = new Vertex<>(null, new NullValue());
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
index cae2bc4..1d91b53 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
@@ -71,7 +71,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 	public GridGraph addDimension(long size, boolean wrapEndpoints) {
 		Preconditions.checkArgument(size >= 2, "Dimension size must be at least 2");
 
-		vertexCount *= size;
+		vertexCount = Math.multiplyExact(vertexCount, size);
 
 		// prevent duplicate edges
 		if (size == 2) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
index daaaf53..0dc95e7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
@@ -36,7 +36,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 	private final ExecutionEnvironment env;
 
 	// Required configuration
-	private long dimensions;
+	private final long dimensions;
 
 	/**
 	 * An undirected {@code Graph} where edges form an n-dimensional hypercube.
@@ -56,6 +56,8 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate() {
+		Preconditions.checkState(dimensions > 0);
+
 		GridGraph graph = new GridGraph(env);
 
 		for (int i = 0; i < dimensions; i++) {

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
index e61fcd8..bae8633 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
@@ -36,7 +36,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 	private final ExecutionEnvironment env;
 
 	// Required configuration
-	private long vertexCount;
+	private final long vertexCount;
 
 	/**
 	 * An undirected {@link Graph} with {@code n} vertices where each vertex

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
index 159e55d..333c051 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
@@ -42,7 +42,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 	private final ExecutionEnvironment env;
 
 	// Required configuration
-	private long vertexPairCount;
+	private final long vertexPairCount;
 
 	/**
 	 * An undirected {@link Graph} containing one or more isolated two-paths.
@@ -62,8 +62,10 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate() {
+		Preconditions.checkState(vertexPairCount > 0);
+
 		// Vertices
-		long vertexCount = 2 * this.vertexPairCount;
+		long vertexCount = 2 * vertexPairCount;
 
 		DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
index 7133320..5cbb256 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
@@ -43,11 +43,11 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 	private final ExecutionEnvironment env;
 
 	// Required configuration
-	private long vertexCount;
+	private final long vertexCount;
 
 	/**
-	 * An undirected {@Graph} with {@code n} vertices where the single central
-	 * node has degree {@code n-1}, connecting to the other {@code n-1}
+	 * An undirected {@link Graph} with {@code n} vertices where the single
+	 * central node has degree {@code n-1}, connecting to the other {@code n-1}
 	 * vertices which have degree {@code 1}.
 	 *
 	 * @param env the Flink execution environment
@@ -63,6 +63,8 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
 
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate() {
+		Preconditions.checkState(vertexCount >= 2);
+
 		// Vertices
 		DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
index 2ff5dc2..1d3b7c6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
@@ -150,6 +150,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		}
 
 		@Override
+		public String toString() {
+			return toPrintableString();
+		}
+
+		@Override
 		public String toPrintableString() {
 			return "vertex count: " + vertexCount
 				+ ", average clustering coefficient: " + averageLocalClusteringCoefficient;

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
index a6b0baa..ae15d40 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
@@ -137,6 +137,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		}
 
 		@Override
+		public String toString() {
+			return toPrintableString();
+		}
+
+		@Override
 		public String toPrintableString() {
 			return "triplet count: " + tripletCount
 				+ ", triangle count: " + triangleCount

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
index 93eadc5..f99df1c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
@@ -504,6 +504,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		}
 
 		@Override
+		public String toString() {
+			return toPrintableString();
+		}
+
+		@Override
 		public String toPrintableString() {
 			NumberFormat nf = NumberFormat.getInstance();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
index c0ddb05..2885d6e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
@@ -150,6 +150,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		}
 
 		@Override
+		public String toString() {
+			return toPrintableString();
+		}
+
+		@Override
 		public String toPrintableString() {
 			return "vertex count: " + vertexCount
 				+ ", average clustering coefficient: " + averageLocalClusteringCoefficient;

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
index ef212ae..1a7314e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
@@ -136,6 +136,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		}
 
 		@Override
+		public String toString() {
+			return toPrintableString();
+		}
+
+		@Override
 		public String toPrintableString() {
 			return "triplet count: " + tripletCount
 				+ ", triangle count: " + triangleCount

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
index f440098..24dcade 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
@@ -198,6 +198,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		}
 
 		@Override
+		public String toString() {
+			return toPrintableString();
+		}
+
+		@Override
 		public String toPrintableString() {
 			NumberFormat nf = NumberFormat.getInstance();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
index e59240b..9ec1eea 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
@@ -48,6 +48,7 @@ import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
+import java.util.Iterator;
 
 /**
  * Hyperlink-Induced Topic Search computes two interdependent scores for every
@@ -387,12 +388,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		public void open(Configuration parameters) throws Exception {
 			super.open(parameters);
 
-			Collection<DoubleValue> var;
-			var = getRuntimeContext().getBroadcastVariable(HUBBINESS_SUM_SQUARED);
-			hubbinessRootSumSquared = Math.sqrt(var.iterator().next().getValue());
+			Collection<DoubleValue> hubbinessSumSquared = getRuntimeContext().getBroadcastVariable(HUBBINESS_SUM_SQUARED);
+			Iterator<DoubleValue> hubbinessSumSquaredIterator = hubbinessSumSquared.iterator();
+			this.hubbinessRootSumSquared = hubbinessSumSquaredIterator.hasNext() ? Math.sqrt(hubbinessSumSquaredIterator.next().getValue()) : Double.NaN;
 
-			var = getRuntimeContext().getBroadcastVariable(AUTHORITY_SUM_SQUARED);
-			authorityRootSumSquared = Math.sqrt(var.iterator().next().getValue());
+			Collection<DoubleValue> authoritySumSquared = getRuntimeContext().getBroadcastVariable(AUTHORITY_SUM_SQUARED);
+			Iterator<DoubleValue> authoritySumSquaredIterator = authoritySumSquared.iterator();
+			authorityRootSumSquared = authoritySumSquaredIterator.hasNext() ? Math.sqrt(authoritySumSquaredIterator.next().getValue()) : Double.NaN;
 		}
 
 		@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
index 71c37aa..d259fac 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
@@ -84,6 +84,9 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 	private double convergenceThreshold;
 
+	// Optional configuration
+	private boolean includeZeroDegreeVertices = false;
+
 	/**
 	 * PageRank with a fixed number of iterations.
 	 *
@@ -126,6 +129,42 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		this.convergenceThreshold = convergenceThreshold;
 	}
 
+	/**
+	 * This PageRank implementation properly handles both source and sink
+	 * vertices which have, respectively, only outgoing and incoming edges.
+	 *
+	 * <p>Setting this flag includes "zero-degree" vertices in the PageRank
+	 * computation and result. These vertices are handled the same as other
+	 * "source" vertices (with a consistent score of
+	 * <code>(1 - damping factor) / number of vertices</code>) but only
+	 * affect the scores of other vertices indirectly through the taking of
+	 * this proportional portion of the "random jump" score.
+	 *
+	 * <p>The cost to include zero-degree vertices is a reduce for uniqueness
+	 * on the vertex set followed by an outer join on the vertex degree
+	 * DataSet.
+	 *
+	 * @param includeZeroDegreeVertices whether to include zero-degree vertices in the iterative computation
+	 * @return this
+	 */
+	public PageRank<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
+		this.includeZeroDegreeVertices = includeZeroDegreeVertices;
+
+		return this;
+	}
+
+	@Override
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
+			return false;
+		}
+
+		PageRank rhs = (PageRank) other;
+
+		return dampingFactor == rhs.dampingFactor &&
+			includeZeroDegreeVertices == rhs.includeZeroDegreeVertices;
+	}
+
 	@Override
 	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
 		super.mergeConfiguration(other);
@@ -142,6 +181,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		// vertex degree
 		DataSet<Vertex<K, Degrees>> vertexDegree = input
 			.run(new VertexDegrees<K, VV, EV>()
+				.setIncludeZeroDegreeVertices(includeZeroDegreeVertices)
 				.setParallelism(parallelism));
 
 		// vertex count
@@ -158,7 +198,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		// vertices with zero in-edges
 		DataSet<Tuple2<K, DoubleValue>> sourceVertices = vertexDegree
 			.flatMap(new InitializeSourceVertices<>())
-			.withBroadcastSet(vertexCount, VERTEX_COUNT)
 				.setParallelism(parallelism)
 				.name("Initialize source vertex scores");
 
@@ -285,7 +324,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			super.open(parameters);
 
 			Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT);
-			output.f1 = new DoubleValue(1.0 / vertexCount.iterator().next().getValue());
+			Iterator<LongValue> vertexCountIterator = vertexCount.iterator();
+			output.f1 = new DoubleValue(vertexCountIterator.hasNext() ? 1.0 / vertexCountIterator.next().getValue() : Double.NaN);
 		}
 
 		@Override
@@ -376,11 +416,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			super.open(parameters);
 
 			Collection<Tuple2<T, DoubleValue>> sumOfScores = getRuntimeContext().getBroadcastVariable(SUM_OF_SCORES);
+			Iterator<Tuple2<T, DoubleValue>> sumOfScoresIterator = sumOfScores.iterator();
 			// floating point precision error is also included in sumOfSinks
-			double sumOfSinks = 1 - sumOfScores.iterator().next().f1.getValue();
+			double sumOfSinks = 1 - (sumOfScoresIterator.hasNext() ? sumOfScoresIterator.next().f1.getValue() : 0);
 
 			Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT);
-			this.vertexCount = vertexCount.iterator().next().getValue();
+			Iterator<LongValue> vertexCountIterator = vertexCount.iterator();
+			this.vertexCount = vertexCountIterator.hasNext() ? vertexCountIterator.next().getValue() : 0;
 
 			this.uniformlyDistributedScore = ((1 - dampingFactor) + dampingFactor * sumOfSinks) / this.vertexCount;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
index 7294fd1..796667f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -310,6 +310,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		}
 
 		@Override
+		public String toString() {
+			return toPrintableString();
+		}
+
+		@Override
 		public String toPrintableString() {
 			NumberFormat nf = NumberFormat.getInstance();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
index 97ee6fa..87c50ce 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
@@ -307,6 +307,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		}
 
 		@Override
+		public String toString() {
+			return toPrintableString();
+		}
+
+		@Override
 		public String toPrintableString() {
 			NumberFormat nf = NumberFormat.getInstance();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
index 8c520e6..3392d47 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -283,6 +283,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		}
 
 		@Override
+		public String toString() {
+			return toPrintableString();
+		}
+
+		@Override
 		public String toPrintableString() {
 			NumberFormat nf = NumberFormat.getInstance();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
index 1116149..39fec4a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -257,6 +257,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		}
 
 		@Override
+		public String toString() {
+			return toPrintableString();
+		}
+
+		@Override
 		public String toPrintableString() {
 			NumberFormat nf = NumberFormat.getInstance();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index 752e206..701e698 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -410,7 +410,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		@Override
 		public void reduce(Iterable<Tuple3<T, T, FloatValue>> values, Collector<Result<T>> out)
 				throws Exception {
-			float sum = 0;
+			double sum = 0;
 			Tuple3<T, T, FloatValue> edge = null;
 
 			for (Tuple3<T, T, FloatValue> next : values) {
@@ -421,7 +421,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			if (sum >= minimumScore) {
 				output.setVertexId0(edge.f0);
 				output.setVertexId1(edge.f1);
-				output.setAdamicAdarScore(sum);
+				output.setAdamicAdarScore((float) sum);
 				out.collect(output);
 			}
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index 1afb5da..1b6acbc 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -24,6 +24,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.generator.CompleteGraph;
 import org.apache.flink.graph.generator.EmptyGraph;
 import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.StarGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
 import org.apache.flink.types.IntValue;
 import org.apache.flink.types.LongValue;
@@ -56,11 +57,17 @@ public class AsmTestBase {
 	// empty graph
 	protected final long emptyGraphVertexCount = 3;
 
-	protected Graph<LongValue, NullValue, NullValue> emptyGraph;
+	protected Graph<LongValue, NullValue, NullValue> emptyGraphWithVertices;
+
+	protected Graph<LongValue, NullValue, NullValue> emptyGraphWithoutVertices;
+
+	// star graph
+	protected final long starGraphVertexCount = 29;
+
+	protected Graph<LongValue, NullValue, NullValue> starGraph;
 
 	@Before
-	public void setup()
-			throws Exception {
+	public void setup() throws Exception {
 		env = ExecutionEnvironment.createCollectionsEnvironment();
 		env.getConfig().enableObjectReuse();
 
@@ -89,8 +96,16 @@ public class AsmTestBase {
 		completeGraph = new CompleteGraph(env, completeGraphVertexCount)
 			.generate();
 
-		// empty graph
-		emptyGraph = new EmptyGraph(env, emptyGraphVertexCount)
+		// empty graph with vertices but no edges
+		emptyGraphWithVertices = new EmptyGraph(env, emptyGraphVertexCount)
+			.generate();
+
+		// empty graph with no vertices or edges
+		emptyGraphWithoutVertices = new EmptyGraph(env, 0)
+			.generate();
+
+		// star graph
+		starGraph = new StarGraph(env, starGraphVertexCount)
 			.generate();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
index 7d82b80..a31ce2e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.graph.asm.dataset;
 
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
@@ -27,6 +29,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -39,15 +42,13 @@ public class ChecksumHashCodeTest {
 	private ExecutionEnvironment env;
 
 	@Before
-	public void setup()
-			throws Exception {
+	public void setup() throws Exception {
 		env = ExecutionEnvironment.createCollectionsEnvironment();
 		env.getConfig().enableObjectReuse();
 	}
 
 	@Test
-	public void testChecksumHashCode()
-			throws Exception {
+	public void testList() throws Exception {
 		List<Long> list = Arrays.asList(ArrayUtils.toObject(
 			new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
 
@@ -58,4 +59,14 @@ public class ChecksumHashCodeTest {
 		assertEquals(list.size(), checksum.getCount());
 		assertEquals(list.size() * (list.size() - 1) / 2, checksum.getChecksum());
 	}
+
+	@Test
+	public void testEmptyList() throws Exception {
+		DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Long>(){}));
+
+		Checksum checksum = new ChecksumHashCode<Long>().run(dataset).execute();
+
+		assertEquals(0, checksum.getCount());
+		assertEquals(0, checksum.getChecksum());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
index 29b454b..cfeadce 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.graph.asm.dataset;
 
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
@@ -26,9 +28,11 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
 
 /**
  * Tests for {@link Collect}.
@@ -38,15 +42,13 @@ public class CollectTest {
 	private ExecutionEnvironment env;
 
 	@Before
-	public void setup()
-			throws Exception {
+	public void setup() throws Exception {
 		env = ExecutionEnvironment.createCollectionsEnvironment();
 		env.getConfig().enableObjectReuse();
 	}
 
 	@Test
-	public void testCollect()
-			throws Exception {
+	public void testList() throws Exception {
 		List<Long> list = Arrays.asList(ArrayUtils.toObject(
 			new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
 
@@ -56,4 +58,13 @@ public class CollectTest {
 
 		assertArrayEquals(list.toArray(), collected.toArray());
 	}
+
+	@Test
+	public void testEmptyList() throws Exception {
+		DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Long>(){}));
+
+		List<Long> collected = new Collect<Long>().run(dataset).execute();
+
+		assertEquals(0, collected.size());
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
index a1160ce..0167a5f 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.graph.asm.dataset;
 
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 
@@ -26,6 +28,7 @@ import org.junit.Before;
 import org.junit.Test;
 
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
 import static org.junit.Assert.assertEquals;
@@ -38,15 +41,13 @@ public class CountTest {
 	private ExecutionEnvironment env;
 
 	@Before
-	public void setup()
-			throws Exception {
+	public void setup() throws Exception {
 		env = ExecutionEnvironment.createCollectionsEnvironment();
 		env.getConfig().enableObjectReuse();
 	}
 
 	@Test
-	public void testCount()
-			throws Exception {
+	public void testList() throws Exception {
 		List<Long> list = Arrays.asList(ArrayUtils.toObject(
 			new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
 
@@ -56,4 +57,13 @@ public class CountTest {
 
 		assertEquals(list.size(), count);
 	}
+
+	@Test
+	public void testEmptyList() throws Exception {
+		DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Long>(){}));
+
+		long count = new Count<Long>().run(dataset).execute();
+
+		assertEquals(0, count);
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
index 63bf133..08ba4aa 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
@@ -37,12 +37,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link EdgeDegreesPair}.
  */
-public class EdgeDegreesPairTest
-extends AsmTestBase {
+public class EdgeDegreesPairTest extends AsmTestBase {
 
 	@Test
-	public void testWithSimpleGraph()
-			throws Exception {
+	public void testWithSimpleGraph() throws Exception {
 		String expectedResult =
 			"(0,1,((null),(2,2,0),(3,0,3)))\n" +
 			"(0,2,((null),(2,2,0),(3,2,1)))\n" +
@@ -59,8 +57,23 @@ extends AsmTestBase {
 	}
 
 	@Test
-	public void testWithRMatGraph()
-			throws Exception {
+	public void testWithEmptyGraphWithVertices() throws Exception {
+		DataSet<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>> degreesPair = emptyGraphWithVertices
+			.run(new EdgeDegreesPair<>());
+
+		assertEquals(0, degreesPair.collect().size());
+	}
+
+	@Test
+	public void testWithEmptyGraphWithoutVertices() throws Exception {
+		DataSet<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>> degreesPair = emptyGraphWithoutVertices
+			.run(new EdgeDegreesPair<>());
+
+		assertEquals(0, degreesPair.collect().size());
+	}
+
+	@Test
+	public void testWithRMatGraph() throws Exception {
 		DataSet<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>> degreesPair = directedRMatGraph(10, 16)
 			.run(new EdgeDegreesPair<>());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
index 967cfb2..884e2d5 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
@@ -37,12 +37,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link EdgeSourceDegrees}.
  */
-public class EdgeSourceDegreesTest
-extends AsmTestBase {
+public class EdgeSourceDegreesTest extends AsmTestBase {
 
 	@Test
-	public void testWithSimpleGraph()
-			throws Exception {
+	public void testWithSimpleGraph() throws Exception {
 		String expectedResult =
 			"(0,1,((null),(2,2,0)))\n" +
 			"(0,2,((null),(2,2,0)))\n" +
@@ -59,8 +57,23 @@ extends AsmTestBase {
 	}
 
 	@Test
-	public void testWithRMatGraph()
-			throws Exception {
+	public void testWithEmptyGraphWithVertices() throws Exception {
+		DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> sourceDegrees = emptyGraphWithVertices
+			.run(new EdgeSourceDegrees<>());
+
+		assertEquals(0, sourceDegrees.collect().size());
+	}
+
+	@Test
+	public void testWithEmptyGraphWithoutVertices() throws Exception {
+		DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> sourceDegrees = emptyGraphWithoutVertices
+			.run(new EdgeSourceDegrees<>());
+
+		assertEquals(0, sourceDegrees.collect().size());
+	}
+
+	@Test
+	public void testWithRMatGraph() throws Exception {
 		DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> sourceDegrees = directedRMatGraph(10, 16)
 			.run(new EdgeSourceDegrees<>());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
index abb76c4..5afd64c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
@@ -37,12 +37,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link EdgeTargetDegrees}.
  */
-public class EdgeTargetDegreesTest
-extends AsmTestBase {
+public class EdgeTargetDegreesTest extends AsmTestBase {
 
 	@Test
-	public void testWithSimpleGraph()
-			throws Exception {
+	public void testWithSimpleGraph() throws Exception {
 		String expectedResult =
 			"(0,1,((null),(3,0,3)))\n" +
 			"(0,2,((null),(3,2,1)))\n" +
@@ -59,8 +57,23 @@ extends AsmTestBase {
 	}
 
 	@Test
-	public void testWithRMatGraph()
-			throws Exception {
+	public void testWithEmptyGraphWithVertices() throws Exception {
+		DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> targetDegrees = emptyGraphWithVertices
+			.run(new EdgeTargetDegrees<>());
+
+		assertEquals(0, targetDegrees.collect().size());
+	}
+
+	@Test
+	public void testWithEmptyGraphWithoutVertices() throws Exception {
+		DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> targetDegrees = emptyGraphWithoutVertices
+			.run(new EdgeTargetDegrees<>());
+
+		assertEquals(0, targetDegrees.collect().size());
+	}
+
+	@Test
+	public void testWithRMatGraph() throws Exception {
 		DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> targetDegrees = directedRMatGraph(10, 16)
 			.run(new EdgeTargetDegrees<>());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
index 91f354f..7b89fe7 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
@@ -36,12 +36,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link VertexDegrees}.
  */
-public class VertexDegreesTest
-extends AsmTestBase {
+public class VertexDegreesTest extends AsmTestBase {
 
 	@Test
-	public void testWithSimpleDirectedGraph()
-			throws Exception {
+	public void testWithDirectedSimpleGraph() throws Exception {
 		DataSet<Vertex<IntValue, Degrees>> degrees = directedSimpleGraph
 			.run(new VertexDegrees<>());
 
@@ -57,8 +55,7 @@ extends AsmTestBase {
 	}
 
 	@Test
-	public void testWithSimpleUndirectedGraph()
-			throws Exception {
+	public void testWithUndirectedSimpleGraph() throws Exception {
 		DataSet<Vertex<IntValue, Degrees>> degrees = undirectedSimpleGraph
 			.run(new VertexDegrees<>());
 
@@ -74,17 +71,14 @@ extends AsmTestBase {
 	}
 
 	@Test
-	public void testWithEmptyGraph()
-			throws Exception {
-		DataSet<Vertex<LongValue, Degrees>> degrees;
-
-		degrees = emptyGraph
+	public void testWithEmptyGraphWithVertices() throws Exception {
+		DataSet<Vertex<LongValue, Degrees>> degreesWithoutZeroDegreeVertices = emptyGraphWithVertices
 			.run(new VertexDegrees<LongValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(false));
 
-		assertEquals(0, degrees.collect().size());
+		assertEquals(0, degreesWithoutZeroDegreeVertices.collect().size());
 
-		degrees = emptyGraph
+		DataSet<Vertex<LongValue, Degrees>> degreesWithZeroDegreeVertices = emptyGraphWithVertices
 			.run(new VertexDegrees<LongValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(true));
 
@@ -93,12 +87,26 @@ extends AsmTestBase {
 			"(1,(0,0,0))\n" +
 			"(2,(0,0,0))";
 
-		TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(degreesWithZeroDegreeVertices.collect(), expectedResult);
+	}
+
+	@Test
+	public void testWithEmptyGraphWithoutVertices() throws Exception {
+		DataSet<Vertex<LongValue, Degrees>> degreesWithoutZeroDegreeVertices = emptyGraphWithoutVertices
+			.run(new VertexDegrees<LongValue, NullValue, NullValue>()
+				.setIncludeZeroDegreeVertices(false));
+
+		assertEquals(0, degreesWithoutZeroDegreeVertices.collect().size());
+
+		DataSet<Vertex<LongValue, Degrees>> degreesWithZeroDegreeVertices = emptyGraphWithoutVertices
+			.run(new VertexDegrees<LongValue, NullValue, NullValue>()
+				.setIncludeZeroDegreeVertices(true));
+
+		assertEquals(0, degreesWithZeroDegreeVertices.collect().size());
 	}
 
 	@Test
-	public void testWithRMatGraph()
-	throws Exception {
+	public void testWithRMatGraph() throws Exception {
 		DataSet<Vertex<LongValue, Degrees>> degrees = directedRMatGraph(10, 16)
 			.run(new VertexDegrees<>());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
index f671cab..7f6c9e3 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
@@ -35,12 +35,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link VertexInDegree}.
  */
-public class VertexInDegreeTest
-extends AsmTestBase {
+public class VertexInDegreeTest extends AsmTestBase {
 
 	@Test
-	public void testWithSimpleGraph()
-			throws Exception {
+	public void testWithDirectedSimpleGraph() throws Exception {
 		DataSet<Vertex<IntValue, LongValue>> inDegree = directedSimpleGraph
 			.run(new VertexInDegree<IntValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(true));
@@ -57,17 +55,31 @@ extends AsmTestBase {
 	}
 
 	@Test
-	public void testWithEmptyGraph()
-			throws Exception {
-		DataSet<Vertex<LongValue, LongValue>> inDegree;
+	public void testWithUndirectedSimpleGraph() throws Exception {
+		DataSet<Vertex<IntValue, LongValue>> inDegree = undirectedSimpleGraph
+			.run(new VertexInDegree<IntValue, NullValue, NullValue>()
+				.setIncludeZeroDegreeVertices(true));
+
+		String expectedResult =
+			"(0,2)\n" +
+			"(1,3)\n" +
+			"(2,3)\n" +
+			"(3,4)\n" +
+			"(4,1)\n" +
+			"(5,1)";
 
-		inDegree = emptyGraph
+		TestBaseUtils.compareResultAsText(inDegree.collect(), expectedResult);
+	}
+
+	@Test
+	public void testWithEmptyGraphWithVertices() throws Exception {
+		DataSet<Vertex<LongValue, LongValue>> inDegreeWithoutZeroDegreeVertices = emptyGraphWithVertices
 			.run(new VertexInDegree<LongValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(false));
 
-		assertEquals(0, inDegree.collect().size());
+		assertEquals(0, inDegreeWithoutZeroDegreeVertices.collect().size());
 
-		inDegree = emptyGraph
+		DataSet<Vertex<LongValue, LongValue>> inDegreeWithZeroDegreeVertices = emptyGraphWithVertices
 			.run(new VertexInDegree<LongValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(true));
 
@@ -76,12 +88,26 @@ extends AsmTestBase {
 			"(1,0)\n" +
 			"(2,0)";
 
-		TestBaseUtils.compareResultAsText(inDegree.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(inDegreeWithZeroDegreeVertices.collect(), expectedResult);
+	}
+
+	@Test
+	public void testWithEmptyGraphWithoutVertices() throws Exception {
+		DataSet<Vertex<LongValue, LongValue>> inDegreeWithoutZeroDegreeVertices = emptyGraphWithoutVertices
+			.run(new VertexInDegree<LongValue, NullValue, NullValue>()
+				.setIncludeZeroDegreeVertices(false));
+
+		assertEquals(0, inDegreeWithoutZeroDegreeVertices.collect().size());
+
+		DataSet<Vertex<LongValue, LongValue>> inDegreeWithZeroDegreeVertices = emptyGraphWithoutVertices
+			.run(new VertexInDegree<LongValue, NullValue, NullValue>()
+				.setIncludeZeroDegreeVertices(true));
+
+		assertEquals(0, inDegreeWithZeroDegreeVertices.collect().size());
 	}
 
 	@Test
-	public void testWithRMatGraph()
-			throws Exception {
+	public void testWithRMatGraph() throws Exception {
 		DataSet<Vertex<LongValue, LongValue>> inDegree = directedRMatGraph(10, 16)
 			.run(new VertexInDegree<LongValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(true));

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
index 1517f23..7031d8f 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
@@ -35,12 +35,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link VertexOutDegree}.
  */
-public class VertexOutDegreeTest
-extends AsmTestBase {
+public class VertexOutDegreeTest extends AsmTestBase {
 
 	@Test
-	public void testWithSimpleGraph()
-			throws Exception {
+	public void testWithDirectedSimpleGraph() throws Exception {
 		DataSet<Vertex<IntValue, LongValue>> outDegree = directedSimpleGraph
 			.run(new VertexOutDegree<IntValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(true));
@@ -57,17 +55,31 @@ extends AsmTestBase {
 	}
 
 	@Test
-	public void testWithEmptyGraph()
-			throws Exception {
-		DataSet<Vertex<LongValue, LongValue>> outDegree;
+	public void testWithUndirectedSimpleGraph() throws Exception {
+		DataSet<Vertex<IntValue, LongValue>> outDegree = undirectedSimpleGraph
+			.run(new VertexOutDegree<IntValue, NullValue, NullValue>()
+				.setIncludeZeroDegreeVertices(true));
+
+		String expectedResult =
+			"(0,2)\n" +
+			"(1,3)\n" +
+			"(2,3)\n" +
+			"(3,4)\n" +
+			"(4,1)\n" +
+			"(5,1)";
 
-		outDegree = emptyGraph
+		TestBaseUtils.compareResultAsText(outDegree.collect(), expectedResult);
+	}
+
+	@Test
+	public void testWithEmptyGraphWithVertices() throws Exception {
+		DataSet<Vertex<LongValue, LongValue>> outDegreeWithoutZeroDegreeVertices = emptyGraphWithVertices
 			.run(new VertexOutDegree<LongValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(false));
 
-		assertEquals(0, outDegree.collect().size());
+		assertEquals(0, outDegreeWithoutZeroDegreeVertices.collect().size());
 
-		outDegree = emptyGraph
+		DataSet<Vertex<LongValue, LongValue>> outDegreeWithZeroDegreeVertices = emptyGraphWithVertices
 			.run(new VertexOutDegree<LongValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(true));
 
@@ -76,12 +88,26 @@ extends AsmTestBase {
 			"(1,0)\n" +
 			"(2,0)";
 
-		TestBaseUtils.compareResultAsText(outDegree.collect(), expectedResult);
+		TestBaseUtils.compareResultAsText(outDegreeWithZeroDegreeVertices.collect(), expectedResult);
+	}
+
+	@Test
+	public void testWithEmptyGraphWithoutVertices() throws Exception {
+		DataSet<Vertex<LongValue, LongValue>> outDegreeWithoutZeroDegreeVertices = emptyGraphWithoutVertices
+			.run(new VertexOutDegree<LongValue, NullValue, NullValue>()
+				.setIncludeZeroDegreeVertices(false));
+
+		assertEquals(0, outDegreeWithoutZeroDegreeVertices.collect().size());
+
+		DataSet<Vertex<LongValue, LongValue>> outDegreeWithZeroDegreeVertices = emptyGraphWithoutVertices
+			.run(new VertexOutDegree<LongValue, NullValue, NullValue>()
+				.setIncludeZeroDegreeVertices(true));
+
+		assertEquals(0, outDegreeWithZeroDegreeVertices.collect().size());
 	}
 
 	@Test
-	public void testWithRMatGraph()
-			throws Exception {
+	public void testWithRMatGraph() throws Exception {
 		DataSet<Vertex<LongValue, LongValue>> outDegree = directedRMatGraph(10, 16)
 			.run(new VertexOutDegree<LongValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(true));

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
index 1cae2e7..95a89c5 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
@@ -36,12 +36,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link EdgeDegreePair}.
  */
-public class EdgeDegreePairTest
-extends AsmTestBase {
+public class EdgeDegreePairTest extends AsmTestBase {
 
 	@Test
-	public void testWithSimpleGraph()
-			throws Exception {
+	public void testWithSimpleGraph() throws Exception {
 		String expectedResult =
 			"(0,1,((null),2,3))\n" +
 			"(0,2,((null),2,3))\n" +
@@ -59,7 +57,8 @@ extends AsmTestBase {
 			"(5,3,((null),1,4))";
 
 		DataSet<Edge<IntValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = undirectedSimpleGraph
-			.run(new EdgeDegreePair<>());
+			.run(new EdgeDegreePair<IntValue, NullValue, NullValue>()
+				.setReduceOnTargetId(false));
 
 		TestBaseUtils.compareResultAsText(degreePairOnSourceId.collect(), expectedResult);
 
@@ -71,10 +70,40 @@ extends AsmTestBase {
 	}
 
 	@Test
-	public void testWithRMatGraph()
-			throws Exception {
+	public void testWithEmptyGraphWithVertices() throws Exception {
+		DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = emptyGraphWithVertices
+			.run(new EdgeDegreePair<LongValue, NullValue, NullValue>()
+				.setReduceOnTargetId(false));
+
+		assertEquals(0, degreePairOnSourceId.collect().size());
+
+		DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnTargetId = emptyGraphWithVertices
+			.run(new EdgeDegreePair<LongValue, NullValue, NullValue>()
+				.setReduceOnTargetId(true));
+
+		assertEquals(0, degreePairOnTargetId.collect().size());
+	}
+
+	@Test
+	public void testWithEmptyGraphWithoutVertices() throws Exception {
+		DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = emptyGraphWithoutVertices
+			.run(new EdgeDegreePair<LongValue, NullValue, NullValue>()
+				.setReduceOnTargetId(false));
+
+		assertEquals(0, degreePairOnSourceId.collect().size());
+
+		DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnTargetId = emptyGraphWithoutVertices
+			.run(new EdgeDegreePair<LongValue, NullValue, NullValue>()
+				.setReduceOnTargetId(true));
+
+		assertEquals(0, degreePairOnTargetId.collect().size());
+	}
+
+	@Test
+	public void testWithRMatGraph() throws Exception {
 		DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = undirectedRMatGraph(10, 16)
-			.run(new EdgeDegreePair<>());
+			.run(new EdgeDegreePair<LongValue, NullValue, NullValue>()
+				.setReduceOnTargetId(false));
 
 		Checksum checksumOnSourceId = new ChecksumHashCode<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>>()
 			.run(degreePairOnSourceId)

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
index 2d8b2e3..3802b4f 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
@@ -36,12 +36,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link EdgeSourceDegree}.
  */
-public class EdgeSourceDegreeTest
-extends AsmTestBase {
+public class EdgeSourceDegreeTest extends AsmTestBase {
 
 	@Test
-	public void testWithSimpleGraph()
-			throws Exception {
+	public void testWithSimpleGraph() throws Exception {
 		String expectedResult =
 			"(0,1,((null),2))\n" +
 			"(0,2,((null),2))\n" +
@@ -59,7 +57,8 @@ extends AsmTestBase {
 			"(5,3,((null),1))";
 
 		DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = undirectedSimpleGraph
-			.run(new EdgeSourceDegree<>());
+			.run(new EdgeSourceDegree<IntValue, NullValue, NullValue>()
+				.setReduceOnTargetId(false));
 
 		TestBaseUtils.compareResultAsText(sourceDegreeOnSourceId.collect(), expectedResult);
 
@@ -71,10 +70,40 @@ extends AsmTestBase {
 	}
 
 	@Test
-	public void testWithRMatGraph()
-			throws Exception {
+	public void testWithEmptyGraphWithVertices() throws Exception {
+		DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = emptyGraphWithVertices
+			.run(new EdgeSourceDegree<LongValue, NullValue, NullValue>()
+				.setReduceOnTargetId(false));
+
+		assertEquals(0, sourceDegreeOnSourceId.collect().size());
+
+		DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnTargetId = emptyGraphWithVertices
+			.run(new EdgeSourceDegree<LongValue, NullValue, NullValue>()
+				.setReduceOnTargetId(true));
+
+		assertEquals(0, sourceDegreeOnTargetId.collect().size());
+	}
+
+	@Test
+	public void testWithEmptyGraphWithoutVertices() throws Exception {
+		DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = emptyGraphWithoutVertices
+			.run(new EdgeSourceDegree<LongValue, NullValue, NullValue>()
+				.setReduceOnTargetId(false));
+
+		assertEquals(0, sourceDegreeOnSourceId.collect().size());
+
+		DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnTargetId = emptyGraphWithoutVertices
+			.run(new EdgeSourceDegree<LongValue, NullValue, NullValue>()
+				.setReduceOnTargetId(true));
+
+		assertEquals(0, sourceDegreeOnTargetId.collect().size());
+	}
+
+	@Test
+	public void testWithRMatGraph() throws Exception {
 		DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = undirectedRMatGraph(10, 16)
-			.run(new EdgeSourceDegree<>());
+			.run(new EdgeSourceDegree<LongValue, NullValue, NullValue>()
+				.setReduceOnTargetId(false));
 
 		Checksum checksumOnSourceId = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>()
 			.run(sourceDegreeOnSourceId)

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
index a7c88a1..51f880b 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
@@ -36,12 +36,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link EdgeTargetDegree}.
  */
-public class EdgeTargetDegreeTest
-extends AsmTestBase {
+public class EdgeTargetDegreeTest extends AsmTestBase {
 
 	@Test
-	public void testWithSimpleGraph()
-			throws Exception {
+	public void testWithSimpleGraph() throws Exception {
 		String expectedResult =
 			"(0,1,((null),3))\n" +
 			"(0,2,((null),3))\n" +
@@ -59,7 +57,8 @@ extends AsmTestBase {
 			"(5,3,((null),4))";
 
 		DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = undirectedSimpleGraph
-				.run(new EdgeTargetDegree<>());
+			.run(new EdgeTargetDegree<IntValue, NullValue, NullValue>()
+				.setReduceOnSourceId(false));
 
 		TestBaseUtils.compareResultAsText(targetDegreeOnTargetId.collect(), expectedResult);
 
@@ -71,10 +70,40 @@ extends AsmTestBase {
 	}
 
 	@Test
-	public void testWithRMatGraph()
-			throws Exception {
+	public void testWithEmptyGraphWithVertices() throws Exception {
+		DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = emptyGraphWithVertices
+			.run(new EdgeTargetDegree<LongValue, NullValue, NullValue>()
+				.setReduceOnSourceId(false));
+
+		assertEquals(0, targetDegreeOnTargetId.collect().size());
+
+		DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnSourceId = emptyGraphWithVertices
+			.run(new EdgeTargetDegree<LongValue, NullValue, NullValue>()
+				.setReduceOnSourceId(true));
+
+		assertEquals(0, targetDegreeOnSourceId.collect().size());
+	}
+
+	@Test
+	public void testWithEmptyGraphWithoutVertices() throws Exception {
+		DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = emptyGraphWithoutVertices
+			.run(new EdgeTargetDegree<LongValue, NullValue, NullValue>()
+				.setReduceOnSourceId(false));
+
+		assertEquals(0, targetDegreeOnTargetId.collect().size());
+
+		DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnSourceId = emptyGraphWithoutVertices
+			.run(new EdgeTargetDegree<LongValue, NullValue, NullValue>()
+				.setReduceOnSourceId(true));
+
+		assertEquals(0, targetDegreeOnSourceId.collect().size());
+	}
+
+	@Test
+	public void testWithRMatGraph() throws Exception {
 		DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = undirectedRMatGraph(10, 16)
-			.run(new EdgeSourceDegree<>());
+			.run(new EdgeTargetDegree<LongValue, NullValue, NullValue>()
+				.setReduceOnSourceId(false));
 
 		Checksum checksumOnTargetId = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>()
 			.run(targetDegreeOnTargetId)

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
index bc76bff..49f0007 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
@@ -35,12 +35,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link VertexDegree}.
  */
-public class VertexDegreeTest
-extends AsmTestBase {
+public class VertexDegreeTest extends AsmTestBase {
 
 	@Test
-	public void testWithSimpleGraph()
-			throws Exception {
+	public void testWithSimpleGraph() throws Exception {
 		String expectedResult =
 			"(0,2)\n" +
 			"(1,3)\n" +
@@ -50,7 +48,8 @@ extends AsmTestBase {
 			"(5,1)";
 
 		DataSet<Vertex<IntValue, LongValue>> degreeOnSourceId = undirectedSimpleGraph
-			.run(new VertexDegree<>());
+			.run(new VertexDegree<IntValue, NullValue, NullValue>()
+				.setReduceOnTargetId(false));
 
 		TestBaseUtils.compareResultAsText(degreeOnSourceId.collect(), expectedResult);
 
@@ -62,12 +61,12 @@ extends AsmTestBase {
 	}
 
 	@Test
-	public void testWithCompleteGraph()
-			throws Exception {
+	public void testWithCompleteGraph() throws Exception {
 		long expectedDegree = completeGraphVertexCount - 1;
 
 		DataSet<Vertex<LongValue, LongValue>> degreeOnSourceId = completeGraph
-			.run(new VertexDegree<>());
+			.run(new VertexDegree<LongValue, NullValue, NullValue>()
+				.setReduceOnTargetId(false));
 
 		for (Vertex<LongValue, LongValue> vertex : degreeOnSourceId.collect()) {
 			assertEquals(expectedDegree, vertex.getValue().getValue());
@@ -83,17 +82,16 @@ extends AsmTestBase {
 	}
 
 	@Test
-	public void testWithEmptyGraph()
-			throws Exception {
+	public void testWithEmptyGraphWithVertices() throws Exception {
 		DataSet<Vertex<LongValue, LongValue>> degree;
 
-		degree = emptyGraph
+		degree = emptyGraphWithVertices
 			.run(new VertexDegree<LongValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(false));
 
 		assertEquals(0, degree.collect().size());
 
-		degree = emptyGraph
+		degree = emptyGraphWithVertices
 			.run(new VertexDegree<LongValue, NullValue, NullValue>()
 				.setIncludeZeroDegreeVertices(true));
 
@@ -106,10 +104,27 @@ extends AsmTestBase {
 	}
 
 	@Test
-	public void testWithRMatGraph()
-			throws Exception {
+	public void testWithEmptyGraphWithoutVertices() throws Exception {
+		DataSet<Vertex<LongValue, LongValue>> degree;
+
+		degree = emptyGraphWithoutVertices
+			.run(new VertexDegree<LongValue, NullValue, NullValue>()
+				.setIncludeZeroDegreeVertices(false));
+
+		assertEquals(0, degree.collect().size());
+
+		degree = emptyGraphWithoutVertices
+			.run(new VertexDegree<LongValue, NullValue, NullValue>()
+				.setIncludeZeroDegreeVertices(true));
+
+		assertEquals(0, degree.collect().size());
+	}
+
+	@Test
+	public void testWithRMatGraph() throws Exception {
 		DataSet<Vertex<LongValue, LongValue>> degreeOnSourceId = undirectedRMatGraph(10, 16)
-			.run(new VertexDegree<>());
+			.run(new VertexDegree<LongValue, NullValue, NullValue>()
+				.setReduceOnTargetId(false));
 
 		Checksum checksumOnSourceId = new ChecksumHashCode<Vertex<LongValue, LongValue>>()
 			.run(degreeOnSourceId)

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
index 51e7712..f43be9c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
 import org.apache.flink.graph.library.metric.ChecksumHashCode;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
 import org.junit.Test;
@@ -33,12 +34,10 @@ import static org.junit.Assert.assertEquals;
 /**
  * Tests for {@link MaximumDegree}.
  */
-public class MaximumDegreeTest
-extends AsmTestBase {
+public class MaximumDegreeTest extends AsmTestBase {
 
 	@Test
-	public void testWithSimpleGraph()
-			throws Exception {
+	public void testWithSimpleGraph() throws Exception {
 		Graph<IntValue, NullValue, NullValue> graph = undirectedSimpleGraph
 			.run(new MaximumDegree<>(3));
 
@@ -63,8 +62,25 @@ extends AsmTestBase {
 	}
 
 	@Test
-	public void testWithRMatGraph()
-			throws Exception {
+	public void testWithEmptyGraphWithVertices() throws Exception {
+		Graph<LongValue, NullValue, NullValue> graph = emptyGraphWithVertices
+			.run(new MaximumDegree<>(1));
+
+		assertEquals(emptyGraphVertexCount, graph.getVertices().collect().size());
+		assertEquals(0, graph.getEdges().collect().size());
+	}
+
+	@Test
+	public void testWithEmptyGraphWithoutVertices() throws Exception {
+		Graph<LongValue, NullValue, NullValue> graph = emptyGraphWithoutVertices
+			.run(new MaximumDegree<>(1));
+
+		assertEquals(0, graph.getVertices().collect().size());
+		assertEquals(0, graph.getEdges().collect().size());
+	}
+
+	@Test
+	public void testWithRMatGraph() throws Exception {
 		Checksum checksum = undirectedRMatGraph(10, 16)
 			.run(new MaximumDegree<>(16))
 			.run(new ChecksumHashCode<>())

http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
index 751d030..f31c09a 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
@@ -62,8 +62,7 @@ public class SimplifyTest {
 	}
 
 	@Test
-	public void test()
-			throws Exception {
+	public void test() throws Exception {
 		String expectedResult =
 			"(0,1,(null))\n" +
 			"(0,2,(null))\n" +