You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/06/02 14:27:39 UTC

[1/2] flink git commit: [FLINK-3806] [gelly] Revert use of DataSet.count()

Repository: flink
Updated Branches:
  refs/heads/master b201f8664 -> 65545c2ed


[FLINK-3806] [gelly] Revert use of DataSet.count()

This closes #2036


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36ad78c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36ad78c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36ad78c0

Branch: refs/heads/master
Commit: 36ad78c0821fdae0a69371c67602dd2a7955e4a8
Parents: b201f86
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed May 25 11:06:01 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Jun 2 09:11:19 2016 -0400

----------------------------------------------------------------------
 docs/apis/batch/libs/gelly.md                   |  1 -
 .../graph/library/HITSAlgorithmITCase.java      | 28 --------
 .../flink/graph/library/PageRankITCase.java     | 15 ++--
 .../graph/gsa/GatherSumApplyIteration.java      | 31 +++++++--
 .../apache/flink/graph/library/GSAPageRank.java | 52 +++-----------
 .../flink/graph/library/HITSAlgorithm.java      | 54 ++-------------
 .../apache/flink/graph/library/PageRank.java    | 55 ++++-----------
 .../graph/spargel/ScatterGatherIteration.java   | 73 ++++++++++++++------
 .../apache/flink/graph/utils/GraphUtils.java    | 58 ++++++++++++++++
 9 files changed, 171 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/36ad78c0/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index 0d3e594..aadbd44 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -1967,7 +1967,6 @@ The constructors take the following parameters:
 
 * `beta`: the damping factor.
 * `maxIterations`: the maximum number of iterations to run.
-* `numVertices`: the number of vertices in the input. If known beforehand, is it advised to provide this argument to speed up execution.
 
 ### GSA PageRank
 

http://git-wip-us.apache.org/repos/asf/flink/blob/36ad78c0/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java
index 019b851..1887725 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java
@@ -56,20 +56,6 @@ public class HITSAlgorithmITCase extends MultipleProgramsTestBase{
 	}
 
 	@Test
-	public void testHITSWithTenIterationsAndNumOfVertices() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Double, NullValue> graph = Graph.fromDataSet(
-				HITSData.getVertexDataSet(env),
-				HITSData.getEdgeDataSet(env),
-				env);
-
-		List<Vertex<Long, Tuple2<DoubleValue, DoubleValue>>> result = graph.run(new HITSAlgorithm<Long, Double, NullValue>(10, 5)).collect();
-		
-		compareWithDelta(result, 1e-7);
-	}
-
-	@Test
 	public void testHITSWithConvergeThreshold() throws Exception {
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 
@@ -83,20 +69,6 @@ public class HITSAlgorithmITCase extends MultipleProgramsTestBase{
 		compareWithDelta(result, 1e-7);
 	}
 
-	@Test
-	public void testHITSWithConvergeThresholdAndNumOfVertices() throws Exception {
-		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
-		Graph<Long, Double, NullValue> graph = Graph.fromDataSet(
-				HITSData.getVertexDataSet(env),
-				HITSData.getEdgeDataSet(env),
-				env);
-
-		List<Vertex<Long, Tuple2<DoubleValue, DoubleValue>>> result = graph.run(new HITSAlgorithm<Long, Double, NullValue>(1e-7, 5)).collect();
-
-		compareWithDelta(result, 1e-7);
-	}
-
 	private void compareWithDelta(List<Vertex<Long, Tuple2<DoubleValue, DoubleValue>>> result, double delta) {
 
 		String resultString = "";

http://git-wip-us.apache.org/repos/asf/flink/blob/36ad78c0/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java
index 034bcd5..e3e8f08 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java
@@ -18,9 +18,6 @@
 
 package org.apache.flink.graph.library;
 
-import java.util.Arrays;
-import java.util.List;
-
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.graph.Graph;
@@ -32,6 +29,9 @@ import org.junit.Test;
 import org.junit.runner.RunWith;
 import org.junit.runners.Parameterized;
 
+import java.util.Arrays;
+import java.util.List;
+
 @RunWith(Parameterized.class)
 public class PageRankITCase extends MultipleProgramsTestBase {
 
@@ -72,9 +72,9 @@ public class PageRankITCase extends MultipleProgramsTestBase {
 		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
 				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
 
-        List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 5, 3))
+        List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3))
         		.collect();
-        
+
         compareWithDelta(result, 0.01);
 	}
 
@@ -85,14 +85,13 @@ public class PageRankITCase extends MultipleProgramsTestBase {
 		Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
 				PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
 
-        List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 5, 3))
+        List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3))
         		.collect();
         
         compareWithDelta(result, 0.01);
 	}
 
-	private void compareWithDelta(List<Vertex<Long, Double>> result,
-																double delta) {
+	private void compareWithDelta(List<Vertex<Long, Double>> result, double delta) {
 
 		String resultString = "";
         for (Vertex<Long, Double> v : result) {

http://git-wip-us.apache.org/repos/asf/flink/blob/36ad78c0/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index d092086..d1b12f9 100755
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -41,9 +41,12 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.GraphUtils;
+import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Collection;
 import java.util.Map;
 
 /**
@@ -125,12 +128,11 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 
 		// check whether the numVertices option is set and, if so, compute the total number of vertices
 		// and set it within the gather, sum and apply functions
+
+		DataSet<LongValue> numberOfVertices = null;
 		if (this.configuration != null && this.configuration.isOptNumVertices()) {
 			try {
-				long numberOfVertices = graph.numberOfVertices();
-				gather.setNumberOfVertices(numberOfVertices);
-				sum.setNumberOfVertices(numberOfVertices);
-				apply.setNumberOfVertices(numberOfVertices);
+				numberOfVertices = GraphUtils.count(this.vertexDataSet);
 			} catch (Exception e) {
 				e.printStackTrace();
 			}
@@ -203,6 +205,9 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 			for (Tuple2<String, DataSet<?>> e : this.configuration.getGatherBcastVars()) {
 				gatherMapOperator = gatherMapOperator.withBroadcastSet(e.f1, e.f0);
 			}
+			if (this.configuration.isOptNumVertices()) {
+				gatherMapOperator = gatherMapOperator.withBroadcastSet(numberOfVertices, "number of vertices");
+			}
 		}
 		DataSet<Tuple2<K, M>> gatheredSet = gatherMapOperator;
 
@@ -215,6 +220,9 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 			for (Tuple2<String, DataSet<?>> e : this.configuration.getSumBcastVars()) {
 				sumReduceOperator = sumReduceOperator.withBroadcastSet(e.f1, e.f0);
 			}
+			if (this.configuration.isOptNumVertices()) {
+				sumReduceOperator = sumReduceOperator.withBroadcastSet(numberOfVertices, "number of vertices");
+			}
 		}
 		DataSet<Tuple2<K, M>> summedSet = sumReduceOperator;
 
@@ -231,6 +239,9 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 			for (Tuple2<String, DataSet<?>> e : this.configuration.getApplyBcastVars()) {
 				appliedSet = appliedSet.withBroadcastSet(e.f1, e.f0);
 			}
+			if (this.configuration.isOptNumVertices()) {
+				appliedSet = appliedSet.withBroadcastSet(numberOfVertices, "number of vertices");
+			}
 		}
 
 		// let the operator know that we preserve the key field
@@ -289,6 +300,10 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 
 		@Override
 		public void open(Configuration parameters) throws Exception {
+			if (getRuntimeContext().hasBroadcastVariable("number of vertices")) {
+				Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices");
+				this.gatherFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
+			}
 			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
 				this.gatherFunction.init(getIterationRuntimeContext());
 			}
@@ -327,6 +342,10 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 
 		@Override
 		public void open(Configuration parameters) throws Exception {
+			if (getRuntimeContext().hasBroadcastVariable("number of vertices")) {
+				Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices");
+				this.sumFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
+			}
 			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
 				this.sumFunction.init(getIterationRuntimeContext());
 			}
@@ -365,6 +384,10 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
 
 		@Override
 		public void open(Configuration parameters) throws Exception {
+			if (getRuntimeContext().hasBroadcastVariable("number of vertices")) {
+				Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices");
+				this.applyFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
+			}
 			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
 				this.applyFunction.init(getIterationRuntimeContext());
 			}

http://git-wip-us.apache.org/repos/asf/flink/blob/36ad78c0/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
index 99624ca..324f9c3 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
@@ -25,6 +25,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GSAConfiguration;
 import org.apache.flink.graph.gsa.GatherFunction;
 import org.apache.flink.graph.gsa.Neighbor;
 import org.apache.flink.graph.gsa.SumFunction;
@@ -32,22 +33,17 @@ import org.apache.flink.graph.gsa.SumFunction;
 /**
  * This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration.
  * The user can define the damping factor and the maximum number of iterations.
- * If the number of vertices of the input graph is known, it should be provided as a parameter
- * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
- * 
+ *
  * The implementation assumes that each page has at least one incoming and one outgoing link.
  */
 public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
 
 	private double beta;
 	private int maxIterations;
-	private long numberOfVertices;
 
 	/**
 	 * Creates an instance of the GSA PageRank algorithm.
-	 * If the number of vertices of the input graph is known,
-	 * use the {@link GSAPageRank#GSAPageRank(double, long, int)} constructor instead.
-	 * 
+	 *
 	 * The implementation assumes that each page has at least one incoming and one outgoing link.
 	 * 
 	 * @param beta the damping factor
@@ -58,37 +54,19 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet
 		this.maxIterations = maxIterations;
 	}
 
-	/**
-	 * Creates an instance of the GSA PageRank algorithm.
-	 * If the number of vertices of the input graph is known,
-	 * use the {@link GSAPageRank#GSAPageRank(double, int)} constructor instead.
-	 * 
-	 * The implementation assumes that each page has at least one incoming and one outgoing link.
-	 * 
-	 * @param beta the damping factor
-	 * @param maxIterations the maximum number of iterations
-	 * @param numVertices the number of vertices in the input
-	 */
-	public GSAPageRank(double beta, long numVertices, int maxIterations) {
-		this.beta = beta;
-		this.numberOfVertices = numVertices;
-		this.maxIterations = maxIterations;
-	}
-
 	@Override
 	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
 
-		if (numberOfVertices == 0) {
-			numberOfVertices = network.numberOfVertices();
-		}
-
 		DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
 
 		Graph<K, Double, Double> networkWithWeights = network
 				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
 
-		return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
-				new UpdateRanks<K>(beta, numberOfVertices), maxIterations)
+		GSAConfiguration parameters = new GSAConfiguration();
+		parameters.setOptNumVertices(true);
+
+		return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(), new SumRanks(),
+				new UpdateRanks<K>(beta), maxIterations, parameters)
 				.getVertices();
 	}
 
@@ -99,18 +77,12 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet
 	@SuppressWarnings("serial")
 	private static final class GatherRanks extends GatherFunction<Double, Double, Double> {
 
-		long numberOfVertices;
-
-		public GatherRanks(long numberOfVertices) {
-			this.numberOfVertices = numberOfVertices;
-		}
-
 		@Override
 		public Double gather(Neighbor<Double, Double> neighbor) {
 			double neighborRank = neighbor.getNeighborValue();
 
 			if(getSuperstepNumber() == 1) {
-				neighborRank = 1.0 / numberOfVertices;
+				neighborRank = 1.0 / this.getNumberOfVertices();
 			}
 
 			return neighborRank * neighbor.getEdgeValue();
@@ -130,16 +102,14 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet
 	private static final class UpdateRanks<K> extends ApplyFunction<K, Double, Double> {
 
 		private final double beta;
-		private final long numVertices;
 
-		public UpdateRanks(double beta, long numberOfVertices) {
+		public UpdateRanks(double beta) {
 			this.beta = beta;
-			this.numVertices = numberOfVertices;
 		}
 
 		@Override
 		public void apply(Double rankSum, Double currentValue) {
-			setResult((1-beta)/numVertices + beta * rankSum);
+			setResult((1-beta)/this.getNumberOfVertices() + beta * rankSum);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/36ad78c0/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java
index 1ea367e..39e9487 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java
@@ -42,8 +42,6 @@ import org.apache.flink.util.Preconditions;
  * represented a page that is linked by many different hubs.
  * Each vertex has a value of Tuple2 type, the first field is hub score and the second field is authority score.
  * The implementation sets same score to every vertex and adds the reverse edge to every edge at the beginning. 
- * If the number of vertices of the input graph is known, it should be provided as a parameter
- * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
  * <p>
  *
  * @see <a href="https://en.wikipedia.org/wiki/HITS_algorithm">HITS Algorithm</a>
@@ -54,7 +52,6 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS
 	private final static double MINIMUMTHRESHOLD = 1e-9;
 
 	private int maxIterations;
-	private long numberOfVertices;
 	private double convergeThreshold;
 
 	/**
@@ -76,26 +73,6 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS
 	}
 
 	/**
-	 * Create an instance of HITS algorithm.
-	 * 
-	 * @param maxIterations    the maximum number of iterations
-	 * @param numberOfVertices the number of vertices in the graph
-	 */
-	public HITSAlgorithm(int maxIterations, long numberOfVertices) {
-		this(maxIterations, MINIMUMTHRESHOLD, numberOfVertices);
-	}
-
-	/**
-	 * Create an instance of HITS algorithm.
-	 * 
-	 * @param convergeThreshold convergence threshold for sum of scores to control whether the iteration should be stopped
-	 * @param numberOfVertices  the number of vertices in the graph
-	 */
-	public HITSAlgorithm(double convergeThreshold, long numberOfVertices) {
-		this(MAXIMUMITERATION, convergeThreshold, numberOfVertices);
-	}
-
-	/**
 	 * Creates an instance of HITS algorithm.
 	 *
 	 * @param maxIterations     the maximum number of iterations
@@ -108,26 +85,8 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS
 		this.convergeThreshold = convergeThreshold;
 	}
 
-	/**
-	 * Creates an instance of HITS algorithm.
-	 *
-	 * @param maxIterations     the maximum number of iterations
-	 * @param convergeThreshold convergence threshold for sum of scores to control whether the iteration should be stopped
-	 * @param numberOfVertices  the number of vertices in the graph
-	 */
-	public HITSAlgorithm(int maxIterations, double convergeThreshold, long numberOfVertices) {
-		this(maxIterations, convergeThreshold);
-		Preconditions.checkArgument(numberOfVertices > 0, "Number of vertices must be greater than zero.");
-		this.numberOfVertices = numberOfVertices;
-	}
-
 	@Override
 	public DataSet<Vertex<K, Tuple2<DoubleValue, DoubleValue>>> run(Graph<K, VV, EV> graph) throws Exception {
-
-		if (numberOfVertices == 0) {
-			numberOfVertices = graph.numberOfVertices();
-		}
-
 		Graph<K, Tuple2<DoubleValue, DoubleValue>, Boolean> newGraph = graph
 				.mapEdges(new AuthorityEdgeMapper<K, EV>())
 				.union(graph.reverse().mapEdges(new HubEdgeMapper<K, EV>()))
@@ -135,12 +94,13 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS
 
 		ScatterGatherConfiguration parameter = new ScatterGatherConfiguration();
 		parameter.setDirection(EdgeDirection.OUT);
+		parameter.setOptNumVertices(true);
 		parameter.registerAggregator("updatedValueSum", new DoubleSumAggregator());
 		parameter.registerAggregator("authorityValueSum", new DoubleSumAggregator());
 		parameter.registerAggregator("diffValueSum", new DoubleSumAggregator());
 
 		return newGraph
-				.runScatterGatherIteration(new VertexUpdate<K>(maxIterations, convergeThreshold, numberOfVertices),
+				.runScatterGatherIteration(new VertexUpdate<K>(maxIterations, convergeThreshold),
 						new MessageUpdate<K>(maxIterations), maxIterations, parameter)
 				.getVertices();
 	}
@@ -153,15 +113,13 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS
 	public static final class VertexUpdate<K> extends VertexUpdateFunction<K, Tuple2<DoubleValue, DoubleValue>, Double> {
 		private int maxIteration;
 		private double convergeThreshold;
-		private long numberOfVertices;
 		private DoubleSumAggregator updatedValueSumAggregator;
 		private DoubleSumAggregator authoritySumAggregator;
 		private DoubleSumAggregator diffSumAggregator;
 
-		public VertexUpdate(int maxIteration, double convergeThreshold, long numberOfVertices) {
+		public VertexUpdate(int maxIteration, double convergeThreshold) {
 			this.maxIteration = maxIteration;
 			this.convergeThreshold = convergeThreshold;
-			this.numberOfVertices = numberOfVertices;
 		}
 
 		@Override
@@ -198,9 +156,9 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS
 
 					//in the first iteration, the diff is the authority value of each vertex
 					double previousAuthAverage = 1.0;
-					double diffValueSum = 1.0 * numberOfVertices;
+					double diffValueSum = 1.0 * getNumberOfVertices();
 					if (getSuperstepNumber() > 1) {
-						previousAuthAverage = ((DoubleValue) getPreviousIterationAggregate("authorityValueSum")).getValue() / numberOfVertices;
+						previousAuthAverage = ((DoubleValue) getPreviousIterationAggregate("authorityValueSum")).getValue() / getNumberOfVertices();
 						diffValueSum = ((DoubleValue) getPreviousIterationAggregate("diffValueSum")).getValue();
 					}
 					authoritySumAggregator.aggregate(previousAuthAverage);
@@ -218,7 +176,7 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS
 					newHubValue.setValue(updateValue);
 					newAuthorityValue.setValue(newAuthorityValue.getValue() / iterationValueSum);
 					authoritySumAggregator.aggregate(newAuthorityValue.getValue());
-					double previousAuthAverage = ((DoubleValue) getPreviousIterationAggregate("authorityValueSum")).getValue() / numberOfVertices;
+					double previousAuthAverage = ((DoubleValue) getPreviousIterationAggregate("authorityValueSum")).getValue() / getNumberOfVertices();
 
 					// count the diff value of sum of authority scores
 					diffSumAggregator.aggregate((previousAuthAverage - newAuthorityValue.getValue()));

http://git-wip-us.apache.org/repos/asf/flink/blob/36ad78c0/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index 9890a7c..f83b05b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -27,27 +27,23 @@ import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.spargel.MessageIterator;
 import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
 import org.apache.flink.graph.spargel.VertexUpdateFunction;
 
 /**
  * This is an implementation of a simple PageRank algorithm, using a scatter-gather iteration.
  * The user can define the damping factor and the maximum number of iterations.
- * If the number of vertices of the input graph is known, it should be provided as a parameter
- * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
- * 
+ *
  * The implementation assumes that each page has at least one incoming and one outgoing link.
  */
 public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
 
 	private double beta;
 	private int maxIterations;
-	private long numberOfVertices;
 
 	/**
 	 * Creates an instance of the PageRank algorithm.
-	 * If the number of vertices of the input graph is known,
-	 * use the {@link PageRank#PageRank(double, long, int)} constructor instead.
-	 * 
+	 *
 	 * The implementation assumes that each page has at least one incoming and one outgoing link.
 	 * 
 	 * @param beta the damping factor
@@ -56,40 +52,21 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
 	public PageRank(double beta, int maxIterations) {
 		this.beta = beta;
 		this.maxIterations = maxIterations;
-		this.numberOfVertices = 0;
-	}
-
-	/**
-	 * Creates an instance of the PageRank algorithm.
-	 * If the number of vertices of the input graph is known,
-	 * use the {@link PageRank#PageRank(double, int)} constructor instead.
-	 * 
-	 * The implementation assumes that each page has at least one incoming and one outgoing link.
-	 * 
-	 * @param beta the damping factor
-	 * @param maxIterations the maximum number of iterations
-	 * @param numVertices the number of vertices in the input
-	 */
-	public PageRank(double beta, long numVertices, int maxIterations) {
-		this.beta = beta;
-		this.maxIterations = maxIterations;
-		this.numberOfVertices = numVertices;
 	}
 
 	@Override
 	public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
 
-		if (numberOfVertices == 0) {
-			numberOfVertices = network.numberOfVertices();
-		}
-
 		DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
 
 		Graph<K, Double, Double> networkWithWeights = network
 				.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
 
-		return networkWithWeights.runScatterGatherIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
-				new RankMessenger<K>(numberOfVertices), maxIterations)
+		ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
+		parameters.setOptNumVertices(true);
+
+		return networkWithWeights.runScatterGatherIteration(new VertexRankUpdater<K>(beta),
+				new RankMessenger<K>(), maxIterations, parameters)
 				.getVertices();
 	}
 
@@ -101,11 +78,9 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
 	public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
 
 		private final double beta;
-		private final long numVertices;
-		
-		public VertexRankUpdater(double beta, long numberOfVertices) {
+
+		public VertexRankUpdater(double beta) {
 			this.beta = beta;
-			this.numVertices = numberOfVertices;
 		}
 
 		@Override
@@ -116,7 +91,7 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
 			}
 
 			// apply the dampening factor / random jump
-			double newRank = (beta * rankSum) + (1 - beta) / numVertices;
+			double newRank = (beta * rankSum) + (1 - beta) / this.getNumberOfVertices();
 			setNewVertexValue(newRank);
 		}
 	}
@@ -129,17 +104,11 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
 	@SuppressWarnings("serial")
 	public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
 
-		private final long numVertices;
-
-		public RankMessenger(long numberOfVertices) {
-			this.numVertices = numberOfVertices;
-		}
-
 		@Override
 		public void sendMessages(Vertex<K, Double> vertex) {
 			if (getSuperstepNumber() == 1) {
 				// initialize vertex ranks
-				vertex.setValue(new Double(1.0 / numVertices));
+				vertex.setValue(1.0 / this.getNumberOfVertices());
 			}
 
 			for (Edge<K, Double> edge : getEdges()) {

http://git-wip-us.apache.org/repos/asf/flink/blob/36ad78c0/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
index 496e36d..165ef1e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
@@ -18,18 +18,15 @@
 
 package org.apache.flink.graph.spargel;
 
-import java.util.Iterator;
-import java.util.Map;
-
 import org.apache.flink.api.common.aggregators.Aggregator;
 import org.apache.flink.api.common.functions.FlatJoinFunction;
 import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.common.functions.RichCoGroupFunction;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.operators.CoGroupOperator;
 import org.apache.flink.api.java.operators.CustomUnaryOperation;
+import org.apache.flink.api.java.operators.DeltaIteration;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.tuple.Tuple3;
 import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -40,9 +37,15 @@ import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.EdgeDirection;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.GraphUtils;
+import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
 /**
  * This class represents iterative graph computations, programmed in a scatter-gather perspective.
  * It is a special case of <i>Bulk Synchronous Parallel</i> computation.
@@ -151,11 +154,10 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 		// check whether the numVertices option is set and, if so, compute the total number of vertices
 		// and set it within the messaging and update functions
 
+		DataSet<LongValue> numberOfVertices = null;
 		if (this.configuration != null && this.configuration.isOptNumVertices()) {
 			try {
-				long numberOfVertices = graph.numberOfVertices();
-				messagingFunction.setNumberOfVertices(numberOfVertices);
-				updateFunction.setNumberOfVertices(numberOfVertices);
+				numberOfVertices = GraphUtils.count(this.initialVertices);
 			} catch (Exception e) {
 				e.printStackTrace();
 			}
@@ -173,9 +175,9 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 		// check whether the degrees option is set and, if so, compute the in and the out degrees and
 		// add them to the vertex value
 		if(this.configuration != null && this.configuration.isOptDegrees()) {
-			return createResultVerticesWithDegrees(graph, messagingDirection, messageTypeInfo);
+			return createResultVerticesWithDegrees(graph, messagingDirection, messageTypeInfo, numberOfVertices);
 		} else {
-			return createResultSimpleVertex(messagingDirection, messageTypeInfo);
+			return createResultSimpleVertex(messagingDirection, messageTypeInfo, numberOfVertices);
 		}
 	}
 
@@ -246,6 +248,10 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
 		@Override
 		public void open(Configuration parameters) throws Exception {
+			if (getRuntimeContext().hasBroadcastVariable("number of vertices")) {
+				Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices");
+				this.vertexUpdateFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
+			}
 			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
 				this.vertexUpdateFunction.init(getIterationRuntimeContext());
 			}
@@ -368,10 +374,13 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 		
 		@Override
 		public void open(Configuration parameters) throws Exception {
+			if (getRuntimeContext().hasBroadcastVariable("number of vertices")) {
+				Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices");
+				this.messagingFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
+			}
 			if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
 				this.messagingFunction.init(getIterationRuntimeContext());
 			}
-			
 			this.messagingFunction.preSuperstep();
 		}
 		
@@ -459,7 +468,8 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 	 */
 	private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunction(
 			DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration,
-			TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
+			TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg,
+			DataSet<LongValue> numberOfVertices) {
 
 		// build the messaging function (co group)
 		CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
@@ -475,6 +485,9 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 			for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
 				messages = messages.withBroadcastSet(e.f1, e.f0);
 			}
+			if (this.configuration.isOptNumVertices()) {
+				messages = messages.withBroadcastSet(numberOfVertices, "number of vertices");
+			}
 		}
 
 		return messages;
@@ -493,7 +506,8 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 	 */
 	private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunctionVerticesWithDegrees(
 			DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration,
-			TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
+			TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg,
+			DataSet<LongValue> numberOfVertices) {
 
 		// build the messaging function (co group)
 		CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
@@ -510,6 +524,9 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 			for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
 				messages = messages.withBroadcastSet(e.f1, e.f0);
 			}
+			if (this.configuration.isOptNumVertices()) {
+				messages = messages.withBroadcastSet(numberOfVertices, "number of vertices");
+			}
 		}
 
 		return messages;
@@ -546,10 +563,11 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 	 *
 	 * @param messagingDirection
 	 * @param messageTypeInfo
+	 * @param numberOfVertices
 	 * @return the operator
 	 */
 	private DataSet<Vertex<K, VV>> createResultSimpleVertex(EdgeDirection messagingDirection,
-		TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
+		TypeInformation<Tuple2<K, Message>> messageTypeInfo, DataSet<LongValue> numberOfVertices) {
 
 		DataSet<Tuple2<K, Message>> messages;
 
@@ -561,14 +579,14 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
 		switch (messagingDirection) {
 			case IN:
-				messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0);
+				messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices);
 				break;
 			case OUT:
-				messages = buildMessagingFunction(iteration, messageTypeInfo, 0, 0);
+				messages = buildMessagingFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices);
 				break;
 			case ALL:
-				messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0)
-						.union(buildMessagingFunction(iteration, messageTypeInfo, 0, 0)) ;
+				messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices)
+						.union(buildMessagingFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices)) ;
 				break;
 			default:
 				throw new IllegalArgumentException("Illegal edge direction");
@@ -581,6 +599,10 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 		CoGroupOperator<?, ?, Vertex<K, VV>> updates =
 				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
 
+		if (this.configuration != null && this.configuration.isOptNumVertices()) {
+			updates = updates.withBroadcastSet(numberOfVertices, "number of vertices");
+		}
+
 		configureUpdateFunction(updates);
 
 		return iteration.closeWith(updates, updates);
@@ -593,11 +615,12 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 	 * @param graph
 	 * @param messagingDirection
 	 * @param messageTypeInfo
+	 * @param numberOfVertices
 	 * @return the operator
 	 */
 	@SuppressWarnings("serial")
 	private DataSet<Vertex<K, VV>> createResultVerticesWithDegrees(Graph<K, VV, EV> graph, EdgeDirection messagingDirection,
-			TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
+			TypeInformation<Tuple2<K, Message>> messageTypeInfo, DataSet<LongValue> numberOfVertices) {
 
 		DataSet<Tuple2<K, Message>> messages;
 
@@ -636,14 +659,14 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 
 		switch (messagingDirection) {
 			case IN:
-				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0);
+				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices);
 				break;
 			case OUT:
-				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0);
+				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices);
 				break;
 			case ALL:
-				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0)
-						.union(buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0)) ;
+				messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices)
+						.union(buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices)) ;
 				break;
 			default:
 				throw new IllegalArgumentException("Illegal edge direction");
@@ -657,6 +680,10 @@ public class ScatterGatherIteration<K, VV, Message, EV>
 		CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, Long, Long>>> updates =
 				messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
 
+		if (this.configuration != null && this.configuration.isOptNumVertices()) {
+			updates = updates.withBroadcastSet(numberOfVertices, "number of vertices");
+		}
+
 		configureUpdateFunction(updates);
 
 		return iteration.closeWith(updates, updates).map(

http://git-wip-us.apache.org/repos/asf/flink/blob/36ad78c0/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
index 009d791..264479b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
@@ -19,12 +19,18 @@
 package org.apache.flink.graph.utils;
 
 import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.Utils;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
 import org.apache.flink.util.AbstractID;
 
+import static org.apache.flink.api.java.typeutils.ValueTypeInfo.LONG_VALUE_TYPE_INFO;
+
 public class GraphUtils {
 
 	/**
@@ -50,4 +56,56 @@ public class GraphUtils {
 
 		return checksum;
 	}
+
+	/**
+	 * Count the number of elements in a DataSet.
+	 *
+	 * @param input DataSet of elements to be counted
+	 * @param <T> element type
+	 * @return count
+	 */
+	public static <T> DataSet<LongValue> count(DataSet<T> input) {
+		return input
+			.map(new MapTo<T, LongValue>(new LongValue(1)))
+				.returns(LONG_VALUE_TYPE_INFO)
+			.reduce(new AddLongValue());
+	}
+
+	/**
+	 * Map each element to a value.
+	 *
+	 * @param <I> input type
+	 * @param <O> output type
+	 */
+	public static class MapTo<I, O>
+	implements MapFunction<I, O> {
+		private final O value;
+
+		/**
+		 * Map each element to the given object.
+		 *
+		 * @param value the object to emit for each element
+		 */
+		public MapTo(O value) {
+			this.value = value;
+		}
+
+		@Override
+		public O map(I o) throws Exception {
+			return value;
+		}
+	}
+
+	/**
+	 * Add {@link LongValue} elements.
+	 */
+	public static class AddLongValue
+	implements ReduceFunction<LongValue> {
+		@Override
+		public LongValue reduce(LongValue value1, LongValue value2)
+				throws Exception {
+			value1.setValue(value1.getValue() + value2.getValue());
+			return value1;
+		}
+	}
 }


[2/2] flink git commit: [FLINK-3945] [gelly] Degree annotation for directed graphs

Posted by gr...@apache.org.
[FLINK-3945] [gelly] Degree annotation for directed graphs

This closes #2021


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65545c2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65545c2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65545c2e

Branch: refs/heads/master
Commit: 65545c2ed46c17df6abd85299b91bad2529cd42c
Parents: 36ad78c
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri May 20 12:54:16 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Jun 2 10:22:29 2016 -0400

----------------------------------------------------------------------
 docs/apis/batch/libs/gelly.md                   |  55 +++-
 .../java/org/apache/flink/graph/EdgeOrder.java  |  47 ++++
 .../annotate/DegreeAnnotationFunctions.java     |  88 +------
 .../annotate/directed/EdgeDegreesPair.java      |  81 ++++++
 .../annotate/directed/EdgeSourceDegrees.java    |  75 ++++++
 .../annotate/directed/EdgeTargetDegrees.java    |  75 ++++++
 .../annotate/directed/VertexDegreePair.java     | 107 --------
 .../degree/annotate/directed/VertexDegrees.java | 262 +++++++++++++++++++
 .../annotate/directed/VertexInDegree.java       |   7 +-
 .../annotate/directed/VertexOutDegree.java      |   7 +-
 .../graph/asm/degree/annotate/package-info.java |  27 +-
 .../annotate/undirected/EdgeDegreePair.java     |   9 +-
 .../annotate/undirected/EdgeSourceDegree.java   |   4 +-
 .../annotate/undirected/EdgeTargetDegree.java   |   4 +-
 .../annotate/undirected/VertexDegree.java       |   3 +-
 .../annotate/directed/EdgeDegreesPairTest.java  |  66 +++++
 .../directed/EdgeSourceDegreesTest.java         |  66 +++++
 .../directed/EdgeTargetDegreesTest.java         |  66 +++++
 .../annotate/directed/VertexDegreePairTest.java |  87 ------
 .../annotate/directed/VertexDegreesTest.java    | 104 ++++++++
 .../annotate/directed/VertexInDegreeTest.java   |   7 +-
 .../annotate/directed/VertexOutDegreeTest.java  |   6 +-
 22 files changed, 947 insertions(+), 306 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index aadbd44..05fbcb5 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -2164,12 +2164,12 @@ DataSet<Vertex<K, LongValue>> outDegree = graph
     </tr>
 
     <tr>
-      <td>degree.annotate.directed.<br/><strong>VertexDegreePair</strong></td>
+      <td>degree.annotate.directed.<br/><strong>VertexDegrees</strong></td>
       <td>
-        <p>Annotate vertices of a <a href="#graph-representation">directed graph</a> with both the out-degree and in-degree.</p>
+        <p>Annotate vertices of a <a href="#graph-representation">directed graph</a> with the degree, out-degree, and in-degree.</p>
 {% highlight java %}
-DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> pairDegree = graph
-  .run(new VertexDegreePair()
+DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> degrees = graph
+  .run(new VertexDegrees()
     .setIncludeZeroDegreeVertices(true));
 {% endhighlight %}
         <p>Optional configuration:</p>
@@ -2181,6 +2181,51 @@ DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> pairDegree = graph
     </tr>
 
     <tr>
+      <td>degree.annotate.directed.<br/><strong>EdgeSourceDegrees</strong></td>
+      <td>
+        <p>Annotate edges of a <a href="#graph-representation">directed graph</a> with the degree, out-degree, and in-degree of the source ID.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, Degrees>>> sourceDegrees = graph
+  .run(new EdgeSourceDegrees());
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.directed.<br/><strong>EdgeTargetDegrees</strong></td>
+      <td>
+        <p>Annotate edges of a <a href="#graph-representation">directed graph</a> with the degree, out-degree, and in-degree of the target ID.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, Degrees>>> targetDegrees = graph
+  .run(new EdgeTargetDegrees();
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>degree.annotate.directed.<br/><strong>EdgeDegreesPair</strong></td>
+      <td>
+        <p>Annotate edges of a <a href="#graph-representation">directed graph</a> with the degree, out-degree, and in-degree of both the source and target vertices.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, Degrees>>> degrees = graph
+  .run(new EdgeDegreesPair());
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
       <td>degree.annotate.undirected.<br/><strong>VertexDegree</strong></td>
       <td>
         <p>Annotate vertices of an <a href="#graph-representation">undirected graph</a> with the degree.</p>
@@ -2236,7 +2281,7 @@ DataSet<Edge<K, Tuple2<EV, LongValue>>> targetDegree = graph
     <tr>
       <td>degree.annotate.undirected.<br/><strong>EdgeDegreePair</strong></td>
       <td>
-        <p>Annotate edges of an <a href="#graph-representation">undirected graph</a> with the degree of both the source and target degree ID.</p>
+        <p>Annotate edges of an <a href="#graph-representation">undirected graph</a> with the degree of both the source and target vertices.</p>
 {% highlight java %}
 DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> pairDegree = graph
   .run(new EdgeDegreePair()

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeOrder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeOrder.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeOrder.java
new file mode 100644
index 0000000..08e955a
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeOrder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+/**
+ * These bitmasks are used by edge-flipping algorithms to mark the edge order
+ * relative to the original edge direction.
+ */
+public enum EdgeOrder {
+
+	FORWARD(0b01),
+	REVERSE(0b10),
+	MUTUAL(0b11);
+
+	private final byte bitmask;
+
+	EdgeOrder(int bitmask) {
+		this.bitmask = (byte)bitmask;
+	}
+
+	/**
+	 * Returns a bitmask used for marking whether an edge is in the same
+	 * direction as in the original edge set (FORWARD), is flipped relative
+	 * to the original edge set (REVERSE), or both (MUTUAL).
+	 *
+	 * @return edge order bitmask
+	 */
+	public byte getBitmask() {
+		return bitmask;
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java
index 098e9fe..b91b4cb 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java
@@ -98,7 +98,7 @@ public class DegreeAnnotationFunctions {
 	 */
 	@ForwardedFieldsFirst("0")
 	@ForwardedFieldsSecond("0")
-	public static final class JoinVertexWithVertexDegree<K, VV>
+	public static class JoinVertexWithVertexDegree<K, VV>
 	implements JoinFunction<Vertex<K, VV>, Vertex<K, LongValue>, Vertex<K, LongValue>> {
 		private LongValue zero = new LongValue(0);
 
@@ -114,68 +114,6 @@ public class DegreeAnnotationFunctions {
 		}
 	}
 
-	/**
-	 * Performs a left outer join to apply a zero count for vertices with
-	 * out- and in-degree of zero.
-	 *
-	 * @param <K> ID type
-	 * @param <VV> vertex value type
-	 */
-	@ForwardedFieldsFirst("0")
-	@ForwardedFieldsSecond("0")
-	public static final class JoinVertexWithVertexDegrees<K, VV>
-	implements JoinFunction<Vertex<K, VV>, Vertex<K, Tuple2<LongValue, LongValue>>, Vertex<K, Tuple2<LongValue, LongValue>>> {
-		private Tuple2<LongValue, LongValue> zeros;
-
-		private Vertex<K, Tuple2<LongValue, LongValue>> output = new Vertex<>();
-
-		public JoinVertexWithVertexDegrees() {
-			LongValue zero = new LongValue(0);
-			zeros = new Tuple2<>(zero, zero);
-		}
-		@Override
-		public Vertex<K, Tuple2<LongValue, LongValue>> join(Vertex<K, VV> vertex, Vertex<K, Tuple2<LongValue, LongValue>> vertexDegree)
-				throws Exception {
-			output.f0 = vertex.f0;
-			output.f1 = (vertexDegree == null) ? zeros : vertexDegree.f1;
-
-			return output;
-		}
-	}
-
-	/**
-	 * Performs a full outer join composing vertex out- and in-degree and
-	 * applying a zero count for vertices having an out- or in-degree of zero.
-	 *
-	 * @param <K> ID type
-	 */
-	@ForwardedFieldsFirst("0")
-	@ForwardedFieldsSecond("0")
-	public static final class JoinVertexDegreeWithVertexDegree<K>
-	implements JoinFunction<Vertex<K, LongValue>, Vertex<K, LongValue>, Vertex<K, Tuple2<LongValue, LongValue>>> {
-		private LongValue zero = new LongValue(0);
-
-		private Tuple2<LongValue, LongValue> degrees = new Tuple2<>();
-
-		private Vertex<K, Tuple2<LongValue, LongValue>> output = new Vertex<>(null, degrees);
-
-		@Override
-		public Vertex<K, Tuple2<LongValue, LongValue>> join(Vertex<K, LongValue> left, Vertex<K, LongValue> right)
-				throws Exception {
-			if (left == null) {
-				output.f0 = right.f0;
-				degrees.f0 = zero;
-				degrees.f1 = right.f1;
-			} else {
-				output.f0 = left.f0;
-				degrees.f0 = left.f1;
-				degrees.f1 = (right == null) ? zero : right.f1;
-			}
-
-			return output;
-		}
-	}
-
 	// --------------------------------------------------------------------------------------------
 	//  Edge functions
 	// --------------------------------------------------------------------------------------------
@@ -185,17 +123,18 @@ public class DegreeAnnotationFunctions {
 	 *
 	 * @param <K> ID type
 	 * @param <EV> edge value type
+	 * @param <D> degree type
 	 */
 	@ForwardedFieldsFirst("0; 1; 2->2.0")
 	@ForwardedFieldsSecond("0; 1->2.1")
-	public static final class JoinEdgeWithVertexDegree<K, EV>
-	implements JoinFunction<Edge<K, EV>, Vertex<K, LongValue>, Edge<K, Tuple2<EV, LongValue>>> {
-		private Tuple2<EV, LongValue> valueAndDegree = new Tuple2<>();
+	public static class JoinEdgeWithVertexDegree<K, EV, D>
+	implements JoinFunction<Edge<K, EV>, Vertex<K, D>, Edge<K, Tuple2<EV, D>>> {
+		private Tuple2<EV, D> valueAndDegree = new Tuple2<>();
 
-		private Edge<K, Tuple2<EV, LongValue>> output = new Edge<>(null, null, valueAndDegree);
+		private Edge<K, Tuple2<EV, D>> output = new Edge<>(null, null, valueAndDegree);
 
 		@Override
-		public Edge<K, Tuple2<EV, LongValue>> join(Edge<K, EV> edge, Vertex<K, LongValue> vertex) throws Exception {
+		public Edge<K, Tuple2<EV, D>> join(Edge<K, EV> edge, Vertex<K, D> vertex) throws Exception {
 			output.f0 = edge.f0;
 			output.f1 = edge.f1;
 			valueAndDegree.f0 = edge.f2;
@@ -210,19 +149,20 @@ public class DegreeAnnotationFunctions {
 	 *
 	 * @param <K> ID type
 	 * @param <EV> edge value type
+	 * @param <D> degree type
 	 */
 	@ForwardedFieldsFirst("0; 1; 2.0; 2.1")
 	@ForwardedFieldsSecond("0; 1->2.2")
-	public static final class JoinEdgeDegreeWithVertexDegree<K, EV>
-	implements JoinFunction<Edge<K, Tuple2<EV, LongValue>>, Vertex<K, LongValue>, Edge<K, Tuple3<EV, LongValue, LongValue>>> {
-		private Tuple3<EV, LongValue, LongValue> valueAndDegrees = new Tuple3<>();
+	public static class JoinEdgeDegreeWithVertexDegree<K, EV, D>
+	implements JoinFunction<Edge<K, Tuple2<EV, D>>, Vertex<K, D>, Edge<K, Tuple3<EV, D, D>>> {
+		private Tuple3<EV, D, D> valueAndDegrees = new Tuple3<>();
 
-		private Edge<K, Tuple3<EV, LongValue, LongValue>> output = new Edge<>(null, null, valueAndDegrees);
+		private Edge<K, Tuple3<EV, D, D>> output = new Edge<>(null, null, valueAndDegrees);
 
 		@Override
-		public Edge<K, Tuple3<EV, LongValue, LongValue>> join(Edge<K, Tuple2<EV, LongValue>> edge, Vertex<K, LongValue> vertex)
+		public Edge<K, Tuple3<EV, D, D>> join(Edge<K, Tuple2<EV, D>> edge, Vertex<K, D> vertex)
 				throws Exception {
-			Tuple2<EV, LongValue> valueAndDegree = edge.f2;
+			Tuple2<EV, D> valueAndDegree = edge.f2;
 
 			output.f0 = edge.f0;
 			output.f1 = edge.f1;

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
new file mode 100644
index 0000000..5aa5ca4
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+
+/**
+ * Annotates edges of a directed graph with the degree, out-degree, and
+ * in-degree of both the source and target vertices.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class EdgeDegreesPair<K, VV, EV>
+implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>>> {
+
+	// Optional configuration
+	private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public EdgeDegreesPair<K, VV, EV> setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	@Override
+	public DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> run(Graph<K, VV, EV> input)
+			throws Exception {
+		// s, t, d(s)
+		DataSet<Edge<K, Tuple2<EV, Degrees>>> edgeSourceDegrees = input
+			.run(new EdgeSourceDegrees<K, VV, EV>()
+				.setParallelism(parallelism));
+
+		// t, d(t)
+		DataSet<Vertex<K, Degrees>> vertexDegrees = input
+			.run(new VertexDegrees<K, VV, EV>()
+				.setParallelism(parallelism));
+
+		// s, t, (d(s), d(t))
+		return edgeSourceDegrees
+			.join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
+			.where(1)
+			.equalTo(0)
+			.with(new JoinEdgeDegreeWithVertexDegree<K, EV, Degrees>())
+				.setParallelism(parallelism)
+				.name("Edge target degree");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
new file mode 100644
index 0000000..85ba0ed
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+
+/**
+ * Annotates edges of a directed graph with the degree, out-degree, and
+ * in-degree of the source vertex.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class EdgeSourceDegrees<K, VV, EV>
+implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> {
+
+	// Optional configuration
+	private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public EdgeSourceDegrees<K, VV, EV> setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	@Override
+	public DataSet<Edge<K, Tuple2<EV, Degrees>>> run(Graph<K, VV, EV> input)
+			throws Exception {
+		// s, d(s)
+		DataSet<Vertex<K, Degrees>> vertexDegrees = input
+			.run(new VertexDegrees<K, VV, EV>()
+				.setParallelism(parallelism));
+
+		// s, t, d(s)
+		return input.getEdges()
+			.join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
+			.where(0)
+			.equalTo(0)
+			.with(new JoinEdgeWithVertexDegree<K, EV, Degrees>())
+				.setParallelism(parallelism)
+				.name("Edge source degrees");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
new file mode 100644
index 0000000..6d72e44
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+
+/**
+ * Annotates edges of a directed graph with the degree, out-degree, and
+ * in-degree of the target vertex.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class EdgeTargetDegrees<K, VV, EV>
+implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> {
+
+	// Optional configuration
+	private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public EdgeTargetDegrees<K, VV, EV> setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	@Override
+	public DataSet<Edge<K, Tuple2<EV, Degrees>>> run(Graph<K, VV, EV> input)
+			throws Exception {
+		// t, d(t)
+		DataSet<Vertex<K, Degrees>> vertexDegrees = input
+			.run(new VertexDegrees<K, VV, EV>()
+				.setParallelism(parallelism));
+
+		// s, t, d(t)
+		return input.getEdges()
+			.join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
+			.where(1)
+			.equalTo(0)
+			.with(new JoinEdgeWithVertexDegree<K, EV, Degrees>())
+				.setParallelism(parallelism)
+				.name("Edge target degrees");
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePair.java
deleted file mode 100644
index 3e2cae0..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePair.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.asm.degree.annotate.directed;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexDegreeWithVertexDegree;
-import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegrees;
-import org.apache.flink.types.LongValue;
-
-/**
- * Annotates vertices of a directed graph with both the out-degree and in-degree.
- *
- * @param <K> ID type
- * @param <VV> vertex value type
- * @param <EV> edge value type
- */
-public class VertexDegreePair<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<LongValue, LongValue>>>> {
-
-	// Optional configuration
-	private boolean includeZeroDegreeVertices = false;
-
-	private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
-
-	/**
-	 * By default only the edge set is processed for the computation of degree.
-	 * When this flag is set an additional join is performed against the vertex
-	 * set in order to output vertices with out- and in-degree of zero.
-	 *
-	 * @param includeZeroDegreeVertices whether to output vertices with out-
-	 *                                  and in-degree of zero
-	 * @return this
-	 */
-	public VertexDegreePair<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
-		this.includeZeroDegreeVertices = includeZeroDegreeVertices;
-
-		return this;
-	}
-
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public VertexDegreePair<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
-	@Override
-	public DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> run(Graph<K, VV, EV> input)
-			throws Exception {
-		// s, deg(s)
-		DataSet<Vertex<K, LongValue>> outDegree = input
-			.run(new VertexOutDegree<K, VV, EV>()
-				.setIncludeZeroDegreeVertices(false));
-
-		// t, deg(t)
-		DataSet<Vertex<K, LongValue>> inDegree = input
-			.run(new VertexInDegree<K, VV, EV>()
-				.setIncludeZeroDegreeVertices(false));
-
-		DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> degree = outDegree
-			.fullOuterJoin(inDegree)
-			.where(0)
-			.equalTo(0)
-			.with(new JoinVertexDegreeWithVertexDegree<K>())
-				.setParallelism(parallelism)
-				.name("Join out- and in-degree");
-
-		if (includeZeroDegreeVertices) {
-			degree = input
-				.getVertices()
-				.leftOuterJoin(degree)
-				.where(0)
-				.equalTo(0)
-				.with(new JoinVertexWithVertexDegrees<K, VV>())
-					.setParallelism(parallelism)
-					.name("Join zero degree vertices");
-		}
-
-		return degree;
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
new file mode 100644
index 0000000..aab0eb6
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeOrder;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * Annotates vertices of a directed graph with the degree, out-, and in-degree.
+ *
+ * @param <K> graph label type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class VertexDegrees<K, VV, EV>
+implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> {
+
+	// Optional configuration
+	private boolean includeZeroDegreeVertices = false;
+
+	private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+	/**
+	 * By default only the edge set is processed for the computation of degree.
+	 * When this flag is set an additional join is performed against the vertex
+	 * set in order to output vertices with an in-degree of zero.
+	 *
+	 * @param includeZeroDegreeVertices whether to output vertices with an
+	 *                                  in-degree of zero
+	 * @return this
+	 */
+	public VertexDegrees<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
+		this.includeZeroDegreeVertices = includeZeroDegreeVertices;
+
+		return this;
+	}
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public VertexDegrees<K, VV, EV> setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	@Override
+	public DataSet<Vertex<K, Degrees>> run(Graph<K, VV, EV> input)
+			throws Exception {
+		// s, t, bitmask
+		DataSet<Tuple3<K, K, ByteValue>> edgesWithOrder = input.getEdges()
+			.flatMap(new EmitAndFlipEdge<K, EV>())
+				.setParallelism(parallelism)
+				.name("Emit and flip edge")
+			.groupBy(0, 1)
+				.reduce(new ReduceBitmask<K>())
+				.setParallelism(parallelism)
+				.name("Reduce bitmask");
+
+		// s, d(s)
+		DataSet<Vertex<K, Degrees>> vertexDegrees = edgesWithOrder
+			.groupBy(0)
+			.sortGroup(1, Order.ASCENDING)
+			.reduceGroup(new DegreeCount<K>())
+				.setParallelism(parallelism)
+				.name("Degree count");
+
+		if (includeZeroDegreeVertices) {
+			vertexDegrees = input.getVertices()
+				.leftOuterJoin(vertexDegrees)
+				.where(0)
+				.equalTo(0)
+				.with(new JoinVertexWithVertexDegrees<K, VV>())
+					.setParallelism(parallelism)
+					.name("Join zero degree vertices");
+		}
+
+		return vertexDegrees;
+	}
+
+	/**
+	 * Emit each vertex both forward and reversed with the associated bitmask.
+	 *
+	 * @param <T> ID type
+	 * @param <TV> vertex value type
+	 */
+	private static class EmitAndFlipEdge<T, TV>
+	implements FlatMapFunction<Edge<T, TV>, Tuple3<T, T, ByteValue>> {
+		private Tuple3<T, T, ByteValue> forward = new Tuple3<>(null, null, new ByteValue(EdgeOrder.FORWARD.getBitmask()));
+
+		private Tuple3<T, T, ByteValue> reverse = new Tuple3<>(null, null, new ByteValue(EdgeOrder.REVERSE.getBitmask()));
+
+		@Override
+		public void flatMap(Edge<T, TV> value, Collector<Tuple3<T, T, ByteValue>> out)
+				throws Exception {
+			forward.f0 = value.f0;
+			forward.f1 = value.f1;
+			out.collect(forward);
+
+			reverse.f0 = value.f1;
+			reverse.f1 = value.f0;
+			out.collect(reverse);
+		}
+	}
+
+	/**
+	 * Combine mutual edges.
+	 *
+	 * @param <T> ID type
+	 */
+	private static class ReduceBitmask<T>
+	implements ReduceFunction<Tuple3<T, T, ByteValue>> {
+		@Override
+		public Tuple3<T, T, ByteValue> reduce(Tuple3<T, T, ByteValue> left, Tuple3<T, T, ByteValue> right)
+				throws Exception {
+			left.f2.setValue((byte)(left.f2.getValue() | right.f2.getValue()));
+			return left;
+		}
+	}
+
+	/**
+	 * Sum vertex degree by counting over mutual, out-, and in-edges.
+	 *
+	 * @param <T> ID type
+	 */
+	private static class DegreeCount<T>
+	implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Vertex<T, Degrees>> {
+		private Vertex<T, Degrees> output = new Vertex<>(null, new Degrees());
+
+		@Override
+		public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, Collector<Vertex<T, Degrees>> out)
+				throws Exception {
+			long degree = 0;
+			long outDegree = 0;
+			long inDegree = 0;
+
+			for (Tuple3<T, T, ByteValue> edge : values) {
+				output.f0 = edge.f0;
+
+				byte bitmask = edge.f2.getValue();
+
+				degree++;
+
+				if (bitmask == EdgeOrder.FORWARD.getBitmask()) {
+					outDegree++;
+				} else if (bitmask == EdgeOrder.REVERSE.getBitmask()) {
+					inDegree++;
+				} else {
+					outDegree++;
+					inDegree++;
+				}
+			}
+
+			output.f1.getDegree().setValue(degree);
+			output.f1.getOutDegree().setValue(outDegree);
+			output.f1.getInDegree().setValue(inDegree);
+
+			out.collect(output);
+		}
+	}
+
+	/**
+	 * Performs a left outer join to apply a zero count for vertices with
+	 * out- and in-degree of zero.
+	 *
+	 * @param <T> ID type
+	 * @param <TV> vertex value type
+	 */
+	@ForwardedFieldsFirst("0")
+	@ForwardedFieldsSecond("0")
+	private static class JoinVertexWithVertexDegrees<T, TV>
+	implements JoinFunction<Vertex<T, TV>, Vertex<T, Degrees>, Vertex<T, Degrees>> {
+		private Vertex<T, Degrees> output = new Vertex<>(null, new Degrees());
+
+		@Override
+		public Vertex<T, Degrees> join(Vertex<T, TV> vertex, Vertex<T, Degrees> vertexDegree)
+				throws Exception {
+			if (vertexDegree == null) {
+				output.f0 = vertex.f0;
+				return output;
+			} else {
+				return vertexDegree;
+			}
+		}
+	}
+
+	/**
+	 * Wraps the vertex degree, out-degree, and in-degree.
+	 */
+	public static class Degrees
+	extends Tuple3<LongValue, LongValue, LongValue> {
+		private static final int HASH_SEED = 0x3a12fc31;
+
+		private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
+
+		public Degrees() {
+			this(new LongValue(), new LongValue(), new LongValue());
+		}
+
+		public Degrees(LongValue value0, LongValue value1, LongValue value2) {
+			super(value0, value1, value2);
+		}
+
+		public LongValue getDegree() {
+			return f0;
+		}
+
+		public LongValue getOutDegree() {
+			return f1;
+		}
+
+		public LongValue getInDegree() {
+			return f2;
+		}
+
+		@Override
+		public int hashCode() {
+			return hasher.reset()
+				.hash(f0.getValue())
+				.hash(f1.getValue())
+				.hash(f2.getValue())
+				.hash();
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index 5825628..00acedb 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -39,7 +39,7 @@ public class VertexInDegree<K, VV, EV>
 implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 
 	// Optional configuration
-	private boolean includeZeroDegreeVertices = true;
+	private boolean includeZeroDegreeVertices = false;
 
 	private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
 
@@ -80,7 +80,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 				.setParallelism(parallelism)
 				.name("Map edge to target ID");
 
-		// t, deg(t)
+		// t, d(t)
 		DataSet<Vertex<K, LongValue>> targetDegree = targetIds
 			.groupBy(0)
 			.reduce(new DegreeCount<K>())
@@ -88,8 +88,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 				.name("Degree count");
 
 		if (includeZeroDegreeVertices) {
-			targetDegree = input
-				.getVertices()
+			targetDegree = input.getVertices()
 				.leftOuterJoin(targetDegree)
 				.where(0)
 				.equalTo(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index 2676104..bcca19d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -39,7 +39,7 @@ public class VertexOutDegree<K, VV, EV>
 implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 
 	// Optional configuration
-	private boolean includeZeroDegreeVertices = true;
+	private boolean includeZeroDegreeVertices = false;
 
 	private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
 
@@ -80,7 +80,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 				.setParallelism(parallelism)
 				.name("Map edge to source ID");
 
-		// s, deg(s)
+		// s, d(s)
 		DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds
 			.groupBy(0)
 			.reduce(new DegreeCount<K>())
@@ -88,8 +88,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 				.name("Degree count");
 
 		if (includeZeroDegreeVertices) {
-			sourceDegree = input
-				.getVertices()
+			sourceDegree = input.getVertices()
 				.leftOuterJoin(sourceDegree)
 				.where(0)
 				.equalTo(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java
index f25fd90..a138b1a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java
@@ -24,18 +24,25 @@
  * equivalent to the out-degree.
  *
  * The undirected graph algorithms are:
- *   {@code VertexDegree}     annotates vertices as <v, deg(v)>
- *   {@code EdgeSourceDegree} annotates edges as <s, t, deg(s)>
- *   {@code EdgeTargetDegree} annotates edges as <s, t, deg(t)>
- *   {@code EdgeDegreePair}   annotates edges as <s, t, (deg(s), deg(t))>
+ *   {@code VertexDegree}      annotates vertices as <v, deg(v)>
+ *   {@code EdgeSourceDegree}  annotates edges as <s, t, (EV, deg(s))>
+ *   {@code EdgeTargetDegree}  annotates edges as <s, t, (EV, deg(t))>
+ *   {@code EdgeDegreePair}    annotates edges as <s, t, (EV, deg(s), deg(t))>
  *
  * The directed graph algorithms are:
- *   {@code VertexOutDegree}  annotates vertices as <v, out(v)>
- *   {@code VertexInDegree}   annotates vertices as <v, in(v)>
- *   {@code VertexDegreePair} annotates vertices as <v, (out(v), in(v))>
+ *   {@code VertexDegrees}     annotates vertices as <v, (deg(v), out(v), in(v))>
+ *   {@code VertexOutDegree}   annotates vertices as <v, out(v)>
+ *   {@code VertexInDegree}    annotates vertices as <v, in(v)>
+ *   {@code EdgeSourceDegrees} annotates edges as <s, t, (deg(s), out(s), in(s))>
+ *   {@code EdgeTargetDegrees} annotates edges as <s, t, (deg(t), out(t), in(t))>
+ *   {@code EdgeDegreesPair}   annotates edges as <s, t, ((deg(s), out(s), in(s)), (deg(t), out(t), in(t)))>
  *
- * A directed graph edge has four possible degrees: source out- and in-degree
- * and target out- and in-degree. This gives 2^4 - 1 = 15 ways to annotate
- * a directed edge.
+ * where:
+ *   EV is the original edge value
+ *   deg(x) is the number of vertex neighbors
+ *   out(x) is the number of vertex neighbors connected by an out-edge
+ *   in(x) is the number of vertex neighbors connected by an in-edge
+ *
+ * (out(x) + in(x)) / 2 <= deg(x) <= out(x) + in(x)
  */
 package org.apache.flink.graph.asm.degree.annotate;

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
index 698a46a..8cc2e08 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
@@ -31,7 +31,8 @@ import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.Join
 import org.apache.flink.types.LongValue;
 
 /**
- * Annotates edges of an undirected graph with the degree of both the source and target degree ID.
+ * Annotates edges of an undirected graph with the degree of both the source
+ * and target degree vertices.
  *
  * @param <K> ID type
  * @param <VV> vertex value type
@@ -75,7 +76,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, LongValue, LongV
 	@Override
 	public DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> run(Graph<K, VV, EV> input)
 			throws Exception {
-		// s, t, deg(s)
+		// s, t, d(s)
 		DataSet<Edge<K, Tuple2<EV, LongValue>>> edgeSourceDegrees = input
 			.run(new EdgeSourceDegree<K, VV, EV>()
 				.setReduceOnTargetId(reduceOnTargetId)
@@ -87,12 +88,12 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, LongValue, LongV
 				.setReduceOnTargetId(reduceOnTargetId)
 				.setParallelism(parallelism));
 
-		// s, t, (deg(s), deg(t))
+		// s, t, (d(s), d(t))
 		return edgeSourceDegrees
 			.join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
 			.where(1)
 			.equalTo(0)
-			.with(new JoinEdgeDegreeWithVertexDegree<K,EV>())
+			.with(new JoinEdgeDegreeWithVertexDegree<K, EV, LongValue>())
 				.setParallelism(parallelism)
 				.name("Edge target degree");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
index 6f64bc1..b9b59c5 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
@@ -30,7 +30,7 @@ import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.Join
 import org.apache.flink.types.LongValue;
 
 /**
- * Annotates edges of an undirected graph with degree of the source ID.
+ * Annotates edges of an undirected graph with degree of the source vertex.
  *
  * @param <K> ID type
  * @param <VV> vertex value type
@@ -85,7 +85,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
 			.join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
 			.where(0)
 			.equalTo(0)
-			.with(new JoinEdgeWithVertexDegree<K, EV>())
+			.with(new JoinEdgeWithVertexDegree<K, EV, LongValue>())
 				.setParallelism(parallelism)
 				.name("Edge source degree");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
index e9505bc..eabfee7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
@@ -30,7 +30,7 @@ import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.Join
 import org.apache.flink.types.LongValue;
 
 /**
- * Annotates edges of an undirected graph with degree of the target ID.
+ * Annotates edges of an undirected graph with degree of the target vertex.
  *
  * @param <K> ID type
  * @param <VV> vertex value type
@@ -85,7 +85,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
 			.join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
 			.where(1)
 			.equalTo(0)
-			.with(new JoinEdgeWithVertexDegree<K, EV>())
+			.with(new JoinEdgeWithVertexDegree<K, EV, LongValue>())
 				.setParallelism(parallelism)
 				.name("Edge target degree");
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index 518afaa..61a5e82 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -111,8 +111,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 				.name("Degree count");
 
 		if (includeZeroDegreeVertices) {
-			degree = input
-				.getVertices()
+			degree = input.getVertices()
 				.leftOuterJoin(degree)
 				.where(0)
 				.equalTo(0)

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
new file mode 100644
index 0000000..3fcb9dd
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EdgeDegreesPairTest
+extends AsmTestBase {
+
+	@Test
+	public void testWithSimpleGraph()
+			throws Exception {
+		String expectedResult =
+			"(0,1,((null),(2,2,0),(3,2,1)))\n" +
+			"(0,2,((null),(2,2,0),(3,1,2)))\n" +
+			"(1,2,((null),(3,2,1),(3,1,2)))\n" +
+			"(1,3,((null),(3,2,1),(4,2,2)))\n" +
+			"(2,3,((null),(3,1,2),(4,2,2)))\n" +
+			"(3,4,((null),(4,2,2),(1,0,1)))\n" +
+			"(3,5,((null),(4,2,2),(1,0,1)))";
+
+		DataSet<Edge<IntValue, Tuple3<NullValue, Degrees, Degrees>>> degrees = directedSimpleGraph
+			.run(new EdgeDegreesPair<IntValue, NullValue, NullValue>());
+
+		TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
+	}
+
+	@Test
+	public void testWithRMatGraph()
+			throws Exception {
+		ChecksumHashCode degreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+			.run(new EdgeDegreesPair<LongValue, NullValue, NullValue>()));
+
+		assertEquals(16384, degreeChecksum.getCount());
+		assertEquals(0x00001f68dfabd17cL, degreeChecksum.getChecksum());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
new file mode 100644
index 0000000..2b22eea
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EdgeSourceDegreesTest
+extends AsmTestBase {
+
+	@Test
+	public void testWithSimpleGraph()
+			throws Exception {
+		String expectedResult =
+			"(0,1,((null),(2,2,0)))\n" +
+			"(0,2,((null),(2,2,0)))\n" +
+			"(1,2,((null),(3,2,1)))\n" +
+			"(1,3,((null),(3,2,1)))\n" +
+			"(2,3,((null),(3,1,2)))\n" +
+			"(3,4,((null),(4,2,2)))\n" +
+			"(3,5,((null),(4,2,2)))";
+
+		DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> degrees = directedSimpleGraph
+				.run(new EdgeSourceDegrees<IntValue, NullValue, NullValue>());
+
+		TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
+	}
+
+	@Test
+	public void testWithRMatGraph()
+			throws Exception {
+		ChecksumHashCode sourceDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+			.run(new EdgeSourceDegrees<LongValue, NullValue, NullValue>()));
+
+		assertEquals(16384, sourceDegreeChecksum.getCount());
+		assertEquals(0x00001ec53bd55136L, sourceDegreeChecksum.getChecksum());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
new file mode 100644
index 0000000..6840dc5
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EdgeTargetDegreesTest
+extends AsmTestBase {
+
+	@Test
+	public void testWithSimpleGraph()
+			throws Exception {
+		String expectedResult =
+			"(0,1,((null),(3,2,1)))\n" +
+			"(0,2,((null),(3,1,2)))\n" +
+			"(1,2,((null),(3,1,2)))\n" +
+			"(1,3,((null),(4,2,2)))\n" +
+			"(2,3,((null),(4,2,2)))\n" +
+			"(3,4,((null),(1,0,1)))\n" +
+			"(3,5,((null),(1,0,1)))";
+
+		DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> degrees = directedSimpleGraph
+				.run(new EdgeTargetDegrees<IntValue, NullValue, NullValue>());
+
+		TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
+	}
+
+	@Test
+	public void testWithRMatGraph()
+			throws Exception {
+		ChecksumHashCode targetDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+			.run(new EdgeTargetDegrees<LongValue, NullValue, NullValue>()));
+
+		assertEquals(16384, targetDegreeChecksum.getCount());
+		assertEquals(0x00001f2867ba8b4fL, targetDegreeChecksum.getChecksum());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePairTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePairTest.java
deleted file mode 100644
index 9400532..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePairTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.asm.degree.annotate.directed;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.asm.AsmTestBase;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class VertexDegreePairTest
-extends AsmTestBase {
-
-	@Test
-	public void testWithSimpleGraph()
-			throws Exception {
-		DataSet<Vertex<IntValue, Tuple2<LongValue, LongValue>>> vertexDegrees = directedSimpleGraph
-			.run(new VertexDegreePair<IntValue, NullValue, NullValue>());
-
-		String expectedResult =
-			"(0,(2,0))\n" +
-			"(1,(2,1))\n" +
-			"(2,(1,2))\n" +
-			"(3,(2,2))\n" +
-			"(4,(0,1))\n" +
-			"(5,(0,1))";
-
-		TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
-	}
-
-	@Test
-	public void testWithEmptyGraph()
-			throws Exception {
-		DataSet<Vertex<LongValue, Tuple2<LongValue, LongValue>>> vertexDegrees;
-
-		vertexDegrees = emptyGraph
-			.run(new VertexDegreePair<LongValue, NullValue, NullValue>()
-				.setIncludeZeroDegreeVertices(false));
-
-		assertEquals(0, vertexDegrees.collect().size());
-
-		vertexDegrees = emptyGraph
-			.run(new VertexDegreePair<LongValue, NullValue, NullValue>()
-				.setIncludeZeroDegreeVertices(true));
-
-		String expectedResult =
-			"(0,(0,0))\n" +
-			"(1,(0,0))\n" +
-			"(2,(0,0))";
-
-		TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
-	}
-
-	@Test
-	public void testWithRMatGraph()
-	throws Exception {
-		ChecksumHashCode degreePairChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
-			.run(new VertexDegreePair<LongValue, NullValue, NullValue>()));
-
-		assertEquals(902, degreePairChecksum.getCount());
-		assertEquals(0x0000000000fc025aL, degreePairChecksum.getChecksum());
-	}
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
new file mode 100644
index 0000000..a0697a2
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class VertexDegreesTest
+extends AsmTestBase {
+
+	@Test
+	public void testWithSimpleDirectedGraph()
+			throws Exception {
+		DataSet<Vertex<IntValue, Degrees>> vertexDegrees = directedSimpleGraph
+			.run(new VertexDegrees<IntValue, NullValue, NullValue>());
+
+		String expectedResult =
+			"(0,(2,2,0))\n" +
+			"(1,(3,2,1))\n" +
+			"(2,(3,1,2))\n" +
+			"(3,(4,2,2))\n" +
+			"(4,(1,0,1))\n" +
+			"(5,(1,0,1))";
+
+		TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+	}
+
+	@Test
+	public void testWithSimpleUndirectedGraph()
+			throws Exception {
+		DataSet<Vertex<IntValue, Degrees>> vertexDegrees = undirectedSimpleGraph
+			.run(new VertexDegrees<IntValue, NullValue, NullValue>());
+
+		String expectedResult =
+			"(0,(2,2,2))\n" +
+			"(1,(3,3,3))\n" +
+			"(2,(3,3,3))\n" +
+			"(3,(4,4,4))\n" +
+			"(4,(1,1,1))\n" +
+			"(5,(1,1,1))";
+
+		TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+	}
+
+	@Test
+	public void testWithEmptyGraph()
+			throws Exception {
+		DataSet<Vertex<LongValue, Degrees>> vertexDegrees;
+
+		vertexDegrees = emptyGraph
+			.run(new VertexDegrees<LongValue, NullValue, NullValue>()
+				.setIncludeZeroDegreeVertices(false));
+
+		assertEquals(0, vertexDegrees.collect().size());
+
+		vertexDegrees = emptyGraph
+			.run(new VertexDegrees<LongValue, NullValue, NullValue>()
+				.setIncludeZeroDegreeVertices(true));
+
+		String expectedResult =
+			"(0,(0,0,0))\n" +
+			"(1,(0,0,0))\n" +
+			"(2,(0,0,0))";
+
+		TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+	}
+
+	@Test
+	public void testWithRMatGraph()
+	throws Exception {
+		ChecksumHashCode degreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+			.run(new VertexDegrees<LongValue, NullValue, NullValue>()));
+
+		assertEquals(902, degreeChecksum.getCount());
+		assertEquals(0x0000015384f40cb6L, degreeChecksum.getChecksum());
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
index 577e675..0fa0fe5 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.graph.asm.degree.annotate.directed;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.AsmTestBase;
@@ -39,7 +38,8 @@ extends AsmTestBase {
 	public void testWithSimpleGraph()
 			throws Exception {
 		DataSet<Vertex<IntValue, LongValue>> vertexDegrees = directedSimpleGraph
-			.run(new VertexInDegree<IntValue, NullValue, NullValue>());
+			.run(new VertexInDegree<IntValue, NullValue, NullValue>()
+				.setIncludeZeroDegreeVertices(true));
 
 		String expectedResult =
 			"(0,0)\n" +
@@ -79,7 +79,8 @@ extends AsmTestBase {
 	public void testWithRMatGraph()
 			throws Exception {
 		ChecksumHashCode inDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
-			.run(new VertexInDegree<LongValue, NullValue, NullValue>()));
+			.run(new VertexInDegree<LongValue, NullValue, NullValue>()
+				.setIncludeZeroDegreeVertices(true)));
 
 		assertEquals(902, inDegreeChecksum.getCount());
 		assertEquals(0x0000000000e1e99cL, inDegreeChecksum.getChecksum());

http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
index b5e9ce8..f7f3d48 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
@@ -38,7 +38,8 @@ extends AsmTestBase {
 	public void testWithSimpleGraph()
 			throws Exception {
 		DataSet<Vertex<IntValue, LongValue>> vertexDegrees = directedSimpleGraph
-			.run(new VertexOutDegree<IntValue, NullValue, NullValue>());
+			.run(new VertexOutDegree<IntValue, NullValue, NullValue>()
+				.setIncludeZeroDegreeVertices(true));
 
 		String expectedResult =
 			"(0,2)\n" +
@@ -78,7 +79,8 @@ extends AsmTestBase {
 	public void testWithRMatGraph()
 			throws Exception {
 		ChecksumHashCode outDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
-			.run(new VertexOutDegree<LongValue, NullValue, NullValue>()));
+			.run(new VertexOutDegree<LongValue, NullValue, NullValue>()
+				.setIncludeZeroDegreeVertices(true)));
 
 		assertEquals(902, outDegreeChecksum.getCount());
 		assertEquals(0x0000000000e1e99cL, outDegreeChecksum.getChecksum());