You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/06/02 14:27:39 UTC
[1/2] flink git commit: [FLINK-3806] [gelly] Revert use of
DataSet.count()
Repository: flink
Updated Branches:
refs/heads/master b201f8664 -> 65545c2ed
[FLINK-3806] [gelly] Revert use of DataSet.count()
This closes #2036
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/36ad78c0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/36ad78c0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/36ad78c0
Branch: refs/heads/master
Commit: 36ad78c0821fdae0a69371c67602dd2a7955e4a8
Parents: b201f86
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed May 25 11:06:01 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Jun 2 09:11:19 2016 -0400
----------------------------------------------------------------------
docs/apis/batch/libs/gelly.md | 1 -
.../graph/library/HITSAlgorithmITCase.java | 28 --------
.../flink/graph/library/PageRankITCase.java | 15 ++--
.../graph/gsa/GatherSumApplyIteration.java | 31 +++++++--
.../apache/flink/graph/library/GSAPageRank.java | 52 +++-----------
.../flink/graph/library/HITSAlgorithm.java | 54 ++-------------
.../apache/flink/graph/library/PageRank.java | 55 ++++-----------
.../graph/spargel/ScatterGatherIteration.java | 73 ++++++++++++++------
.../apache/flink/graph/utils/GraphUtils.java | 58 ++++++++++++++++
9 files changed, 171 insertions(+), 196 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/36ad78c0/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index 0d3e594..aadbd44 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -1967,7 +1967,6 @@ The constructors take the following parameters:
* `beta`: the damping factor.
* `maxIterations`: the maximum number of iterations to run.
-* `numVertices`: the number of vertices in the input. If known beforehand, is it advised to provide this argument to speed up execution.
### GSA PageRank
http://git-wip-us.apache.org/repos/asf/flink/blob/36ad78c0/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java
index 019b851..1887725 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/HITSAlgorithmITCase.java
@@ -56,20 +56,6 @@ public class HITSAlgorithmITCase extends MultipleProgramsTestBase{
}
@Test
- public void testHITSWithTenIterationsAndNumOfVertices() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Double, NullValue> graph = Graph.fromDataSet(
- HITSData.getVertexDataSet(env),
- HITSData.getEdgeDataSet(env),
- env);
-
- List<Vertex<Long, Tuple2<DoubleValue, DoubleValue>>> result = graph.run(new HITSAlgorithm<Long, Double, NullValue>(10, 5)).collect();
-
- compareWithDelta(result, 1e-7);
- }
-
- @Test
public void testHITSWithConvergeThreshold() throws Exception {
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
@@ -83,20 +69,6 @@ public class HITSAlgorithmITCase extends MultipleProgramsTestBase{
compareWithDelta(result, 1e-7);
}
- @Test
- public void testHITSWithConvergeThresholdAndNumOfVertices() throws Exception {
- final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
-
- Graph<Long, Double, NullValue> graph = Graph.fromDataSet(
- HITSData.getVertexDataSet(env),
- HITSData.getEdgeDataSet(env),
- env);
-
- List<Vertex<Long, Tuple2<DoubleValue, DoubleValue>>> result = graph.run(new HITSAlgorithm<Long, Double, NullValue>(1e-7, 5)).collect();
-
- compareWithDelta(result, 1e-7);
- }
-
private void compareWithDelta(List<Vertex<Long, Tuple2<DoubleValue, DoubleValue>>> result, double delta) {
String resultString = "";
http://git-wip-us.apache.org/repos/asf/flink/blob/36ad78c0/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java
index 034bcd5..e3e8f08 100644
--- a/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java
+++ b/flink-libraries/flink-gelly-examples/src/test/java/org/apache/flink/graph/library/PageRankITCase.java
@@ -18,9 +18,6 @@
package org.apache.flink.graph.library;
-import java.util.Arrays;
-import java.util.List;
-
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.Graph;
@@ -32,6 +29,9 @@ import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
+import java.util.Arrays;
+import java.util.List;
+
@RunWith(Parameterized.class)
public class PageRankITCase extends MultipleProgramsTestBase {
@@ -72,9 +72,9 @@ public class PageRankITCase extends MultipleProgramsTestBase {
Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
- List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 5, 3))
+ List<Vertex<Long, Double>> result = inputGraph.run(new PageRank<Long>(0.85, 3))
.collect();
-
+
compareWithDelta(result, 0.01);
}
@@ -85,14 +85,13 @@ public class PageRankITCase extends MultipleProgramsTestBase {
Graph<Long, Double, Double> inputGraph = Graph.fromDataSet(
PageRankData.getDefaultEdgeDataSet(env), new InitMapper(), env);
- List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 5, 3))
+ List<Vertex<Long, Double>> result = inputGraph.run(new GSAPageRank<Long>(0.85, 3))
.collect();
compareWithDelta(result, 0.01);
}
- private void compareWithDelta(List<Vertex<Long, Double>> result,
- double delta) {
+ private void compareWithDelta(List<Vertex<Long, Double>> result, double delta) {
String resultString = "";
for (Vertex<Long, Double> v : result) {
http://git-wip-us.apache.org/repos/asf/flink/blob/36ad78c0/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
index d092086..d1b12f9 100755
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/gsa/GatherSumApplyIteration.java
@@ -41,9 +41,12 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.GraphUtils;
+import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
+import java.util.Collection;
import java.util.Map;
/**
@@ -125,12 +128,11 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
// check whether the numVertices option is set and, if so, compute the total number of vertices
// and set it within the gather, sum and apply functions
+
+ DataSet<LongValue> numberOfVertices = null;
if (this.configuration != null && this.configuration.isOptNumVertices()) {
try {
- long numberOfVertices = graph.numberOfVertices();
- gather.setNumberOfVertices(numberOfVertices);
- sum.setNumberOfVertices(numberOfVertices);
- apply.setNumberOfVertices(numberOfVertices);
+ numberOfVertices = GraphUtils.count(this.vertexDataSet);
} catch (Exception e) {
e.printStackTrace();
}
@@ -203,6 +205,9 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
for (Tuple2<String, DataSet<?>> e : this.configuration.getGatherBcastVars()) {
gatherMapOperator = gatherMapOperator.withBroadcastSet(e.f1, e.f0);
}
+ if (this.configuration.isOptNumVertices()) {
+ gatherMapOperator = gatherMapOperator.withBroadcastSet(numberOfVertices, "number of vertices");
+ }
}
DataSet<Tuple2<K, M>> gatheredSet = gatherMapOperator;
@@ -215,6 +220,9 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
for (Tuple2<String, DataSet<?>> e : this.configuration.getSumBcastVars()) {
sumReduceOperator = sumReduceOperator.withBroadcastSet(e.f1, e.f0);
}
+ if (this.configuration.isOptNumVertices()) {
+ sumReduceOperator = sumReduceOperator.withBroadcastSet(numberOfVertices, "number of vertices");
+ }
}
DataSet<Tuple2<K, M>> summedSet = sumReduceOperator;
@@ -231,6 +239,9 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
for (Tuple2<String, DataSet<?>> e : this.configuration.getApplyBcastVars()) {
appliedSet = appliedSet.withBroadcastSet(e.f1, e.f0);
}
+ if (this.configuration.isOptNumVertices()) {
+ appliedSet = appliedSet.withBroadcastSet(numberOfVertices, "number of vertices");
+ }
}
// let the operator know that we preserve the key field
@@ -289,6 +300,10 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
@Override
public void open(Configuration parameters) throws Exception {
+ if (getRuntimeContext().hasBroadcastVariable("number of vertices")) {
+ Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices");
+ this.gatherFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
+ }
if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
this.gatherFunction.init(getIterationRuntimeContext());
}
@@ -327,6 +342,10 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
@Override
public void open(Configuration parameters) throws Exception {
+ if (getRuntimeContext().hasBroadcastVariable("number of vertices")) {
+ Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices");
+ this.sumFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
+ }
if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
this.sumFunction.init(getIterationRuntimeContext());
}
@@ -365,6 +384,10 @@ public class GatherSumApplyIteration<K, VV, EV, M> implements CustomUnaryOperati
@Override
public void open(Configuration parameters) throws Exception {
+ if (getRuntimeContext().hasBroadcastVariable("number of vertices")) {
+ Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices");
+ this.applyFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
+ }
if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
this.applyFunction.init(getIterationRuntimeContext());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/36ad78c0/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
index 99624ca..324f9c3 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/GSAPageRank.java
@@ -25,6 +25,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.gsa.ApplyFunction;
+import org.apache.flink.graph.gsa.GSAConfiguration;
import org.apache.flink.graph.gsa.GatherFunction;
import org.apache.flink.graph.gsa.Neighbor;
import org.apache.flink.graph.gsa.SumFunction;
@@ -32,22 +33,17 @@ import org.apache.flink.graph.gsa.SumFunction;
/**
* This is an implementation of a simple PageRank algorithm, using a gather-sum-apply iteration.
* The user can define the damping factor and the maximum number of iterations.
- * If the number of vertices of the input graph is known, it should be provided as a parameter
- * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
- *
+ *
* The implementation assumes that each page has at least one incoming and one outgoing link.
*/
public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
private double beta;
private int maxIterations;
- private long numberOfVertices;
/**
* Creates an instance of the GSA PageRank algorithm.
- * If the number of vertices of the input graph is known,
- * use the {@link GSAPageRank#GSAPageRank(double, long, int)} constructor instead.
- *
+ *
* The implementation assumes that each page has at least one incoming and one outgoing link.
*
* @param beta the damping factor
@@ -58,37 +54,19 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet
this.maxIterations = maxIterations;
}
- /**
- * Creates an instance of the GSA PageRank algorithm.
- * If the number of vertices of the input graph is known,
- * use the {@link GSAPageRank#GSAPageRank(double, int)} constructor instead.
- *
- * The implementation assumes that each page has at least one incoming and one outgoing link.
- *
- * @param beta the damping factor
- * @param maxIterations the maximum number of iterations
- * @param numVertices the number of vertices in the input
- */
- public GSAPageRank(double beta, long numVertices, int maxIterations) {
- this.beta = beta;
- this.numberOfVertices = numVertices;
- this.maxIterations = maxIterations;
- }
-
@Override
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
- if (numberOfVertices == 0) {
- numberOfVertices = network.numberOfVertices();
- }
-
DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
Graph<K, Double, Double> networkWithWeights = network
.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
- return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(numberOfVertices), new SumRanks(),
- new UpdateRanks<K>(beta, numberOfVertices), maxIterations)
+ GSAConfiguration parameters = new GSAConfiguration();
+ parameters.setOptNumVertices(true);
+
+ return networkWithWeights.runGatherSumApplyIteration(new GatherRanks(), new SumRanks(),
+ new UpdateRanks<K>(beta), maxIterations, parameters)
.getVertices();
}
@@ -99,18 +77,12 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet
@SuppressWarnings("serial")
private static final class GatherRanks extends GatherFunction<Double, Double, Double> {
- long numberOfVertices;
-
- public GatherRanks(long numberOfVertices) {
- this.numberOfVertices = numberOfVertices;
- }
-
@Override
public Double gather(Neighbor<Double, Double> neighbor) {
double neighborRank = neighbor.getNeighborValue();
if(getSuperstepNumber() == 1) {
- neighborRank = 1.0 / numberOfVertices;
+ neighborRank = 1.0 / this.getNumberOfVertices();
}
return neighborRank * neighbor.getEdgeValue();
@@ -130,16 +102,14 @@ public class GSAPageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet
private static final class UpdateRanks<K> extends ApplyFunction<K, Double, Double> {
private final double beta;
- private final long numVertices;
- public UpdateRanks(double beta, long numberOfVertices) {
+ public UpdateRanks(double beta) {
this.beta = beta;
- this.numVertices = numberOfVertices;
}
@Override
public void apply(Double rankSum, Double currentValue) {
- setResult((1-beta)/numVertices + beta * rankSum);
+ setResult((1-beta)/this.getNumberOfVertices() + beta * rankSum);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/36ad78c0/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java
index 1ea367e..39e9487 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/HITSAlgorithm.java
@@ -42,8 +42,6 @@ import org.apache.flink.util.Preconditions;
* represented a page that is linked by many different hubs.
* Each vertex has a value of Tuple2 type, the first field is hub score and the second field is authority score.
* The implementation sets same score to every vertex and adds the reverse edge to every edge at the beginning.
- * If the number of vertices of the input graph is known, it should be provided as a parameter
- * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
* <p>
*
* @see <a href="https://en.wikipedia.org/wiki/HITS_algorithm">HITS Algorithm</a>
@@ -54,7 +52,6 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS
private final static double MINIMUMTHRESHOLD = 1e-9;
private int maxIterations;
- private long numberOfVertices;
private double convergeThreshold;
/**
@@ -76,26 +73,6 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS
}
/**
- * Create an instance of HITS algorithm.
- *
- * @param maxIterations the maximum number of iterations
- * @param numberOfVertices the number of vertices in the graph
- */
- public HITSAlgorithm(int maxIterations, long numberOfVertices) {
- this(maxIterations, MINIMUMTHRESHOLD, numberOfVertices);
- }
-
- /**
- * Create an instance of HITS algorithm.
- *
- * @param convergeThreshold convergence threshold for sum of scores to control whether the iteration should be stopped
- * @param numberOfVertices the number of vertices in the graph
- */
- public HITSAlgorithm(double convergeThreshold, long numberOfVertices) {
- this(MAXIMUMITERATION, convergeThreshold, numberOfVertices);
- }
-
- /**
* Creates an instance of HITS algorithm.
*
* @param maxIterations the maximum number of iterations
@@ -108,26 +85,8 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS
this.convergeThreshold = convergeThreshold;
}
- /**
- * Creates an instance of HITS algorithm.
- *
- * @param maxIterations the maximum number of iterations
- * @param convergeThreshold convergence threshold for sum of scores to control whether the iteration should be stopped
- * @param numberOfVertices the number of vertices in the graph
- */
- public HITSAlgorithm(int maxIterations, double convergeThreshold, long numberOfVertices) {
- this(maxIterations, convergeThreshold);
- Preconditions.checkArgument(numberOfVertices > 0, "Number of vertices must be greater than zero.");
- this.numberOfVertices = numberOfVertices;
- }
-
@Override
public DataSet<Vertex<K, Tuple2<DoubleValue, DoubleValue>>> run(Graph<K, VV, EV> graph) throws Exception {
-
- if (numberOfVertices == 0) {
- numberOfVertices = graph.numberOfVertices();
- }
-
Graph<K, Tuple2<DoubleValue, DoubleValue>, Boolean> newGraph = graph
.mapEdges(new AuthorityEdgeMapper<K, EV>())
.union(graph.reverse().mapEdges(new HubEdgeMapper<K, EV>()))
@@ -135,12 +94,13 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS
ScatterGatherConfiguration parameter = new ScatterGatherConfiguration();
parameter.setDirection(EdgeDirection.OUT);
+ parameter.setOptNumVertices(true);
parameter.registerAggregator("updatedValueSum", new DoubleSumAggregator());
parameter.registerAggregator("authorityValueSum", new DoubleSumAggregator());
parameter.registerAggregator("diffValueSum", new DoubleSumAggregator());
return newGraph
- .runScatterGatherIteration(new VertexUpdate<K>(maxIterations, convergeThreshold, numberOfVertices),
+ .runScatterGatherIteration(new VertexUpdate<K>(maxIterations, convergeThreshold),
new MessageUpdate<K>(maxIterations), maxIterations, parameter)
.getVertices();
}
@@ -153,15 +113,13 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS
public static final class VertexUpdate<K> extends VertexUpdateFunction<K, Tuple2<DoubleValue, DoubleValue>, Double> {
private int maxIteration;
private double convergeThreshold;
- private long numberOfVertices;
private DoubleSumAggregator updatedValueSumAggregator;
private DoubleSumAggregator authoritySumAggregator;
private DoubleSumAggregator diffSumAggregator;
- public VertexUpdate(int maxIteration, double convergeThreshold, long numberOfVertices) {
+ public VertexUpdate(int maxIteration, double convergeThreshold) {
this.maxIteration = maxIteration;
this.convergeThreshold = convergeThreshold;
- this.numberOfVertices = numberOfVertices;
}
@Override
@@ -198,9 +156,9 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS
//in the first iteration, the diff is the authority value of each vertex
double previousAuthAverage = 1.0;
- double diffValueSum = 1.0 * numberOfVertices;
+ double diffValueSum = 1.0 * getNumberOfVertices();
if (getSuperstepNumber() > 1) {
- previousAuthAverage = ((DoubleValue) getPreviousIterationAggregate("authorityValueSum")).getValue() / numberOfVertices;
+ previousAuthAverage = ((DoubleValue) getPreviousIterationAggregate("authorityValueSum")).getValue() / getNumberOfVertices();
diffValueSum = ((DoubleValue) getPreviousIterationAggregate("diffValueSum")).getValue();
}
authoritySumAggregator.aggregate(previousAuthAverage);
@@ -218,7 +176,7 @@ public class HITSAlgorithm<K, VV, EV> implements GraphAlgorithm<K, VV, EV, DataS
newHubValue.setValue(updateValue);
newAuthorityValue.setValue(newAuthorityValue.getValue() / iterationValueSum);
authoritySumAggregator.aggregate(newAuthorityValue.getValue());
- double previousAuthAverage = ((DoubleValue) getPreviousIterationAggregate("authorityValueSum")).getValue() / numberOfVertices;
+ double previousAuthAverage = ((DoubleValue) getPreviousIterationAggregate("authorityValueSum")).getValue() / getNumberOfVertices();
// count the diff value of sum of authority scores
diffSumAggregator.aggregate((previousAuthAverage - newAuthorityValue.getValue()));
http://git-wip-us.apache.org/repos/asf/flink/blob/36ad78c0/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
index 9890a7c..f83b05b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/PageRank.java
@@ -27,27 +27,23 @@ import org.apache.flink.graph.GraphAlgorithm;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.spargel.MessageIterator;
import org.apache.flink.graph.spargel.MessagingFunction;
+import org.apache.flink.graph.spargel.ScatterGatherConfiguration;
import org.apache.flink.graph.spargel.VertexUpdateFunction;
/**
* This is an implementation of a simple PageRank algorithm, using a scatter-gather iteration.
* The user can define the damping factor and the maximum number of iterations.
- * If the number of vertices of the input graph is known, it should be provided as a parameter
- * to speed up computation. Otherwise, the algorithm will first execute a job to count the vertices.
- *
+ *
* The implementation assumes that each page has at least one incoming and one outgoing link.
*/
public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Vertex<K, Double>>> {
private double beta;
private int maxIterations;
- private long numberOfVertices;
/**
* Creates an instance of the PageRank algorithm.
- * If the number of vertices of the input graph is known,
- * use the {@link PageRank#PageRank(double, long, int)} constructor instead.
- *
+ *
* The implementation assumes that each page has at least one incoming and one outgoing link.
*
* @param beta the damping factor
@@ -56,40 +52,21 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
public PageRank(double beta, int maxIterations) {
this.beta = beta;
this.maxIterations = maxIterations;
- this.numberOfVertices = 0;
- }
-
- /**
- * Creates an instance of the PageRank algorithm.
- * If the number of vertices of the input graph is known,
- * use the {@link PageRank#PageRank(double, int)} constructor instead.
- *
- * The implementation assumes that each page has at least one incoming and one outgoing link.
- *
- * @param beta the damping factor
- * @param maxIterations the maximum number of iterations
- * @param numVertices the number of vertices in the input
- */
- public PageRank(double beta, long numVertices, int maxIterations) {
- this.beta = beta;
- this.maxIterations = maxIterations;
- this.numberOfVertices = numVertices;
}
@Override
public DataSet<Vertex<K, Double>> run(Graph<K, Double, Double> network) throws Exception {
- if (numberOfVertices == 0) {
- numberOfVertices = network.numberOfVertices();
- }
-
DataSet<Tuple2<K, Long>> vertexOutDegrees = network.outDegrees();
Graph<K, Double, Double> networkWithWeights = network
.joinWithEdgesOnSource(vertexOutDegrees, new InitWeights());
- return networkWithWeights.runScatterGatherIteration(new VertexRankUpdater<K>(beta, numberOfVertices),
- new RankMessenger<K>(numberOfVertices), maxIterations)
+ ScatterGatherConfiguration parameters = new ScatterGatherConfiguration();
+ parameters.setOptNumVertices(true);
+
+ return networkWithWeights.runScatterGatherIteration(new VertexRankUpdater<K>(beta),
+ new RankMessenger<K>(), maxIterations, parameters)
.getVertices();
}
@@ -101,11 +78,9 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
public static final class VertexRankUpdater<K> extends VertexUpdateFunction<K, Double, Double> {
private final double beta;
- private final long numVertices;
-
- public VertexRankUpdater(double beta, long numberOfVertices) {
+
+ public VertexRankUpdater(double beta) {
this.beta = beta;
- this.numVertices = numberOfVertices;
}
@Override
@@ -116,7 +91,7 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
}
// apply the dampening factor / random jump
- double newRank = (beta * rankSum) + (1 - beta) / numVertices;
+ double newRank = (beta * rankSum) + (1 - beta) / this.getNumberOfVertices();
setNewVertexValue(newRank);
}
}
@@ -129,17 +104,11 @@ public class PageRank<K> implements GraphAlgorithm<K, Double, Double, DataSet<Ve
@SuppressWarnings("serial")
public static final class RankMessenger<K> extends MessagingFunction<K, Double, Double, Double> {
- private final long numVertices;
-
- public RankMessenger(long numberOfVertices) {
- this.numVertices = numberOfVertices;
- }
-
@Override
public void sendMessages(Vertex<K, Double> vertex) {
if (getSuperstepNumber() == 1) {
// initialize vertex ranks
- vertex.setValue(new Double(1.0 / numVertices));
+ vertex.setValue(1.0 / this.getNumberOfVertices());
}
for (Edge<K, Double> edge : getEdges()) {
http://git-wip-us.apache.org/repos/asf/flink/blob/36ad78c0/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
index 496e36d..165ef1e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/spargel/ScatterGatherIteration.java
@@ -18,18 +18,15 @@
package org.apache.flink.graph.spargel;
-import java.util.Iterator;
-import java.util.Map;
-
import org.apache.flink.api.common.aggregators.Aggregator;
import org.apache.flink.api.common.functions.FlatJoinFunction;
import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.common.functions.RichCoGroupFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.operators.CoGroupOperator;
import org.apache.flink.api.java.operators.CustomUnaryOperation;
+import org.apache.flink.api.java.operators.DeltaIteration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.typeutils.ResultTypeQueryable;
@@ -40,9 +37,15 @@ import org.apache.flink.graph.Edge;
import org.apache.flink.graph.EdgeDirection;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.utils.GraphUtils;
+import org.apache.flink.types.LongValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+
/**
* This class represents iterative graph computations, programmed in a scatter-gather perspective.
* It is a special case of <i>Bulk Synchronous Parallel</i> computation.
@@ -151,11 +154,10 @@ public class ScatterGatherIteration<K, VV, Message, EV>
// check whether the numVertices option is set and, if so, compute the total number of vertices
// and set it within the messaging and update functions
+ DataSet<LongValue> numberOfVertices = null;
if (this.configuration != null && this.configuration.isOptNumVertices()) {
try {
- long numberOfVertices = graph.numberOfVertices();
- messagingFunction.setNumberOfVertices(numberOfVertices);
- updateFunction.setNumberOfVertices(numberOfVertices);
+ numberOfVertices = GraphUtils.count(this.initialVertices);
} catch (Exception e) {
e.printStackTrace();
}
@@ -173,9 +175,9 @@ public class ScatterGatherIteration<K, VV, Message, EV>
// check whether the degrees option is set and, if so, compute the in and the out degrees and
// add them to the vertex value
if(this.configuration != null && this.configuration.isOptDegrees()) {
- return createResultVerticesWithDegrees(graph, messagingDirection, messageTypeInfo);
+ return createResultVerticesWithDegrees(graph, messagingDirection, messageTypeInfo, numberOfVertices);
} else {
- return createResultSimpleVertex(messagingDirection, messageTypeInfo);
+ return createResultSimpleVertex(messagingDirection, messageTypeInfo, numberOfVertices);
}
}
@@ -246,6 +248,10 @@ public class ScatterGatherIteration<K, VV, Message, EV>
@Override
public void open(Configuration parameters) throws Exception {
+ if (getRuntimeContext().hasBroadcastVariable("number of vertices")) {
+ Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices");
+ this.vertexUpdateFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
+ }
if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
this.vertexUpdateFunction.init(getIterationRuntimeContext());
}
@@ -368,10 +374,13 @@ public class ScatterGatherIteration<K, VV, Message, EV>
@Override
public void open(Configuration parameters) throws Exception {
+ if (getRuntimeContext().hasBroadcastVariable("number of vertices")) {
+ Collection<LongValue> numberOfVertices = getRuntimeContext().getBroadcastVariable("number of vertices");
+ this.messagingFunction.setNumberOfVertices(numberOfVertices.iterator().next().getValue());
+ }
if (getIterationRuntimeContext().getSuperstepNumber() == 1) {
this.messagingFunction.init(getIterationRuntimeContext());
}
-
this.messagingFunction.preSuperstep();
}
@@ -459,7 +468,8 @@ public class ScatterGatherIteration<K, VV, Message, EV>
*/
private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunction(
DeltaIteration<Vertex<K, VV>, Vertex<K, VV>> iteration,
- TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
+ TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg,
+ DataSet<LongValue> numberOfVertices) {
// build the messaging function (co group)
CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
@@ -475,6 +485,9 @@ public class ScatterGatherIteration<K, VV, Message, EV>
for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
messages = messages.withBroadcastSet(e.f1, e.f0);
}
+ if (this.configuration.isOptNumVertices()) {
+ messages = messages.withBroadcastSet(numberOfVertices, "number of vertices");
+ }
}
return messages;
@@ -493,7 +506,8 @@ public class ScatterGatherIteration<K, VV, Message, EV>
*/
private CoGroupOperator<?, ?, Tuple2<K, Message>> buildMessagingFunctionVerticesWithDegrees(
DeltaIteration<Vertex<K, Tuple3<VV, Long, Long>>, Vertex<K, Tuple3<VV, Long, Long>>> iteration,
- TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg) {
+ TypeInformation<Tuple2<K, Message>> messageTypeInfo, int whereArg, int equalToArg,
+ DataSet<LongValue> numberOfVertices) {
// build the messaging function (co group)
CoGroupOperator<?, ?, Tuple2<K, Message>> messages;
@@ -510,6 +524,9 @@ public class ScatterGatherIteration<K, VV, Message, EV>
for (Tuple2<String, DataSet<?>> e : this.configuration.getMessagingBcastVars()) {
messages = messages.withBroadcastSet(e.f1, e.f0);
}
+ if (this.configuration.isOptNumVertices()) {
+ messages = messages.withBroadcastSet(numberOfVertices, "number of vertices");
+ }
}
return messages;
@@ -546,10 +563,11 @@ public class ScatterGatherIteration<K, VV, Message, EV>
*
* @param messagingDirection
* @param messageTypeInfo
+ * @param numberOfVertices
* @return the operator
*/
private DataSet<Vertex<K, VV>> createResultSimpleVertex(EdgeDirection messagingDirection,
- TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
+ TypeInformation<Tuple2<K, Message>> messageTypeInfo, DataSet<LongValue> numberOfVertices) {
DataSet<Tuple2<K, Message>> messages;
@@ -561,14 +579,14 @@ public class ScatterGatherIteration<K, VV, Message, EV>
switch (messagingDirection) {
case IN:
- messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0);
+ messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices);
break;
case OUT:
- messages = buildMessagingFunction(iteration, messageTypeInfo, 0, 0);
+ messages = buildMessagingFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices);
break;
case ALL:
- messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0)
- .union(buildMessagingFunction(iteration, messageTypeInfo, 0, 0)) ;
+ messages = buildMessagingFunction(iteration, messageTypeInfo, 1, 0, numberOfVertices)
+ .union(buildMessagingFunction(iteration, messageTypeInfo, 0, 0, numberOfVertices)) ;
break;
default:
throw new IllegalArgumentException("Illegal edge direction");
@@ -581,6 +599,10 @@ public class ScatterGatherIteration<K, VV, Message, EV>
CoGroupOperator<?, ?, Vertex<K, VV>> updates =
messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
+ if (this.configuration != null && this.configuration.isOptNumVertices()) {
+ updates = updates.withBroadcastSet(numberOfVertices, "number of vertices");
+ }
+
configureUpdateFunction(updates);
return iteration.closeWith(updates, updates);
@@ -593,11 +615,12 @@ public class ScatterGatherIteration<K, VV, Message, EV>
* @param graph
* @param messagingDirection
* @param messageTypeInfo
+ * @param numberOfVertices
* @return the operator
*/
@SuppressWarnings("serial")
private DataSet<Vertex<K, VV>> createResultVerticesWithDegrees(Graph<K, VV, EV> graph, EdgeDirection messagingDirection,
- TypeInformation<Tuple2<K, Message>> messageTypeInfo) {
+ TypeInformation<Tuple2<K, Message>> messageTypeInfo, DataSet<LongValue> numberOfVertices) {
DataSet<Tuple2<K, Message>> messages;
@@ -636,14 +659,14 @@ public class ScatterGatherIteration<K, VV, Message, EV>
switch (messagingDirection) {
case IN:
- messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0);
+ messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices);
break;
case OUT:
- messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0);
+ messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices);
break;
case ALL:
- messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0)
- .union(buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0)) ;
+ messages = buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 1, 0, numberOfVertices)
+ .union(buildMessagingFunctionVerticesWithDegrees(iteration, messageTypeInfo, 0, 0, numberOfVertices)) ;
break;
default:
throw new IllegalArgumentException("Illegal edge direction");
@@ -657,6 +680,10 @@ public class ScatterGatherIteration<K, VV, Message, EV>
CoGroupOperator<?, ?, Vertex<K, Tuple3<VV, Long, Long>>> updates =
messages.coGroup(iteration.getSolutionSet()).where(0).equalTo(0).with(updateUdf);
+ if (this.configuration != null && this.configuration.isOptNumVertices()) {
+ updates = updates.withBroadcastSet(numberOfVertices, "number of vertices");
+ }
+
configureUpdateFunction(updates);
return iteration.closeWith(updates, updates).map(
http://git-wip-us.apache.org/repos/asf/flink/blob/36ad78c0/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
index 009d791..264479b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/GraphUtils.java
@@ -19,12 +19,18 @@
package org.apache.flink.graph.utils;
import org.apache.flink.api.common.JobExecutionResult;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.Utils;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
+import org.apache.flink.types.LongValue;
import org.apache.flink.util.AbstractID;
+import static org.apache.flink.api.java.typeutils.ValueTypeInfo.LONG_VALUE_TYPE_INFO;
+
public class GraphUtils {
/**
@@ -50,4 +56,56 @@ public class GraphUtils {
return checksum;
}
+
+ /**
+ * Count the number of elements in a DataSet.
+ *
+ * @param input DataSet of elements to be counted
+ * @param <T> element type
+ * @return count
+ */
+ public static <T> DataSet<LongValue> count(DataSet<T> input) {
+ return input
+ .map(new MapTo<T, LongValue>(new LongValue(1)))
+ .returns(LONG_VALUE_TYPE_INFO)
+ .reduce(new AddLongValue());
+ }
+
+ /**
+ * Map each element to a value.
+ *
+ * @param <I> input type
+ * @param <O> output type
+ */
+ public static class MapTo<I, O>
+ implements MapFunction<I, O> {
+ private final O value;
+
+ /**
+ * Map each element to the given object.
+ *
+ * @param value the object to emit for each element
+ */
+ public MapTo(O value) {
+ this.value = value;
+ }
+
+ @Override
+ public O map(I o) throws Exception {
+ return value;
+ }
+ }
+
+ /**
+ * Add {@link LongValue} elements.
+ */
+ public static class AddLongValue
+ implements ReduceFunction<LongValue> {
+ @Override
+ public LongValue reduce(LongValue value1, LongValue value2)
+ throws Exception {
+ value1.setValue(value1.getValue() + value2.getValue());
+ return value1;
+ }
+ }
}
[2/2] flink git commit: [FLINK-3945] [gelly] Degree annotation for
directed graphs
Posted by gr...@apache.org.
[FLINK-3945] [gelly] Degree annotation for directed graphs
This closes #2021
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/65545c2e
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/65545c2e
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/65545c2e
Branch: refs/heads/master
Commit: 65545c2ed46c17df6abd85299b91bad2529cd42c
Parents: 36ad78c
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri May 20 12:54:16 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Jun 2 10:22:29 2016 -0400
----------------------------------------------------------------------
docs/apis/batch/libs/gelly.md | 55 +++-
.../java/org/apache/flink/graph/EdgeOrder.java | 47 ++++
.../annotate/DegreeAnnotationFunctions.java | 88 +------
.../annotate/directed/EdgeDegreesPair.java | 81 ++++++
.../annotate/directed/EdgeSourceDegrees.java | 75 ++++++
.../annotate/directed/EdgeTargetDegrees.java | 75 ++++++
.../annotate/directed/VertexDegreePair.java | 107 --------
.../degree/annotate/directed/VertexDegrees.java | 262 +++++++++++++++++++
.../annotate/directed/VertexInDegree.java | 7 +-
.../annotate/directed/VertexOutDegree.java | 7 +-
.../graph/asm/degree/annotate/package-info.java | 27 +-
.../annotate/undirected/EdgeDegreePair.java | 9 +-
.../annotate/undirected/EdgeSourceDegree.java | 4 +-
.../annotate/undirected/EdgeTargetDegree.java | 4 +-
.../annotate/undirected/VertexDegree.java | 3 +-
.../annotate/directed/EdgeDegreesPairTest.java | 66 +++++
.../directed/EdgeSourceDegreesTest.java | 66 +++++
.../directed/EdgeTargetDegreesTest.java | 66 +++++
.../annotate/directed/VertexDegreePairTest.java | 87 ------
.../annotate/directed/VertexDegreesTest.java | 104 ++++++++
.../annotate/directed/VertexInDegreeTest.java | 7 +-
.../annotate/directed/VertexOutDegreeTest.java | 6 +-
22 files changed, 947 insertions(+), 306 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index aadbd44..05fbcb5 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -2164,12 +2164,12 @@ DataSet<Vertex<K, LongValue>> outDegree = graph
</tr>
<tr>
- <td>degree.annotate.directed.<br/><strong>VertexDegreePair</strong></td>
+ <td>degree.annotate.directed.<br/><strong>VertexDegrees</strong></td>
<td>
- <p>Annotate vertices of a <a href="#graph-representation">directed graph</a> with both the out-degree and in-degree.</p>
+ <p>Annotate vertices of a <a href="#graph-representation">directed graph</a> with the degree, out-degree, and in-degree.</p>
{% highlight java %}
-DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> pairDegree = graph
- .run(new VertexDegreePair()
+DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> degrees = graph
+ .run(new VertexDegrees()
.setIncludeZeroDegreeVertices(true));
{% endhighlight %}
<p>Optional configuration:</p>
@@ -2181,6 +2181,51 @@ DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> pairDegree = graph
</tr>
<tr>
+ <td>degree.annotate.directed.<br/><strong>EdgeSourceDegrees</strong></td>
+ <td>
+ <p>Annotate edges of a <a href="#graph-representation">directed graph</a> with the degree, out-degree, and in-degree of the source ID.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, Degrees>>> sourceDegrees = graph
+ .run(new EdgeSourceDegrees());
+{% endhighlight %}
+ <p>Optional configuration:</p>
+ <ul>
+ <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+ </ul>
+ </td>
+ </tr>
+
+ <tr>
+ <td>degree.annotate.directed.<br/><strong>EdgeTargetDegrees</strong></td>
+ <td>
+ <p>Annotate edges of a <a href="#graph-representation">directed graph</a> with the degree, out-degree, and in-degree of the target ID.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, Degrees>>> targetDegrees = graph
+ .run(new EdgeTargetDegrees();
+{% endhighlight %}
+ <p>Optional configuration:</p>
+ <ul>
+ <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+ </ul>
+ </td>
+ </tr>
+
+ <tr>
+ <td>degree.annotate.directed.<br/><strong>EdgeDegreesPair</strong></td>
+ <td>
+ <p>Annotate edges of a <a href="#graph-representation">directed graph</a> with the degree, out-degree, and in-degree of both the source and target vertices.</p>
+{% highlight java %}
+DataSet<Edge<K, Tuple2<EV, Degrees>>> degrees = graph
+ .run(new EdgeDegreesPair());
+{% endhighlight %}
+ <p>Optional configuration:</p>
+ <ul>
+ <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+ </ul>
+ </td>
+ </tr>
+
+ <tr>
<td>degree.annotate.undirected.<br/><strong>VertexDegree</strong></td>
<td>
<p>Annotate vertices of an <a href="#graph-representation">undirected graph</a> with the degree.</p>
@@ -2236,7 +2281,7 @@ DataSet<Edge<K, Tuple2<EV, LongValue>>> targetDegree = graph
<tr>
<td>degree.annotate.undirected.<br/><strong>EdgeDegreePair</strong></td>
<td>
- <p>Annotate edges of an <a href="#graph-representation">undirected graph</a> with the degree of both the source and target degree ID.</p>
+ <p>Annotate edges of an <a href="#graph-representation">undirected graph</a> with the degree of both the source and target vertices.</p>
{% highlight java %}
DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> pairDegree = graph
.run(new EdgeDegreePair()
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeOrder.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeOrder.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeOrder.java
new file mode 100644
index 0000000..08e955a
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/EdgeOrder.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph;
+
+/**
+ * These bitmasks are used by edge-flipping algorithms to mark the edge order
+ * relative to the original edge direction.
+ */
+public enum EdgeOrder {
+
+ FORWARD(0b01),
+ REVERSE(0b10),
+ MUTUAL(0b11);
+
+ private final byte bitmask;
+
+ EdgeOrder(int bitmask) {
+ this.bitmask = (byte)bitmask;
+ }
+
+ /**
+ * Returns a bitmask used for marking whether an edge is in the same
+ * direction as in the original edge set (FORWARD), is flipped relative
+ * to the original edge set (REVERSE), or both (MUTUAL).
+ *
+ * @return edge order bitmask
+ */
+ public byte getBitmask() {
+ return bitmask;
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java
index 098e9fe..b91b4cb 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/DegreeAnnotationFunctions.java
@@ -98,7 +98,7 @@ public class DegreeAnnotationFunctions {
*/
@ForwardedFieldsFirst("0")
@ForwardedFieldsSecond("0")
- public static final class JoinVertexWithVertexDegree<K, VV>
+ public static class JoinVertexWithVertexDegree<K, VV>
implements JoinFunction<Vertex<K, VV>, Vertex<K, LongValue>, Vertex<K, LongValue>> {
private LongValue zero = new LongValue(0);
@@ -114,68 +114,6 @@ public class DegreeAnnotationFunctions {
}
}
- /**
- * Performs a left outer join to apply a zero count for vertices with
- * out- and in-degree of zero.
- *
- * @param <K> ID type
- * @param <VV> vertex value type
- */
- @ForwardedFieldsFirst("0")
- @ForwardedFieldsSecond("0")
- public static final class JoinVertexWithVertexDegrees<K, VV>
- implements JoinFunction<Vertex<K, VV>, Vertex<K, Tuple2<LongValue, LongValue>>, Vertex<K, Tuple2<LongValue, LongValue>>> {
- private Tuple2<LongValue, LongValue> zeros;
-
- private Vertex<K, Tuple2<LongValue, LongValue>> output = new Vertex<>();
-
- public JoinVertexWithVertexDegrees() {
- LongValue zero = new LongValue(0);
- zeros = new Tuple2<>(zero, zero);
- }
- @Override
- public Vertex<K, Tuple2<LongValue, LongValue>> join(Vertex<K, VV> vertex, Vertex<K, Tuple2<LongValue, LongValue>> vertexDegree)
- throws Exception {
- output.f0 = vertex.f0;
- output.f1 = (vertexDegree == null) ? zeros : vertexDegree.f1;
-
- return output;
- }
- }
-
- /**
- * Performs a full outer join composing vertex out- and in-degree and
- * applying a zero count for vertices having an out- or in-degree of zero.
- *
- * @param <K> ID type
- */
- @ForwardedFieldsFirst("0")
- @ForwardedFieldsSecond("0")
- public static final class JoinVertexDegreeWithVertexDegree<K>
- implements JoinFunction<Vertex<K, LongValue>, Vertex<K, LongValue>, Vertex<K, Tuple2<LongValue, LongValue>>> {
- private LongValue zero = new LongValue(0);
-
- private Tuple2<LongValue, LongValue> degrees = new Tuple2<>();
-
- private Vertex<K, Tuple2<LongValue, LongValue>> output = new Vertex<>(null, degrees);
-
- @Override
- public Vertex<K, Tuple2<LongValue, LongValue>> join(Vertex<K, LongValue> left, Vertex<K, LongValue> right)
- throws Exception {
- if (left == null) {
- output.f0 = right.f0;
- degrees.f0 = zero;
- degrees.f1 = right.f1;
- } else {
- output.f0 = left.f0;
- degrees.f0 = left.f1;
- degrees.f1 = (right == null) ? zero : right.f1;
- }
-
- return output;
- }
- }
-
// --------------------------------------------------------------------------------------------
// Edge functions
// --------------------------------------------------------------------------------------------
@@ -185,17 +123,18 @@ public class DegreeAnnotationFunctions {
*
* @param <K> ID type
* @param <EV> edge value type
+ * @param <D> degree type
*/
@ForwardedFieldsFirst("0; 1; 2->2.0")
@ForwardedFieldsSecond("0; 1->2.1")
- public static final class JoinEdgeWithVertexDegree<K, EV>
- implements JoinFunction<Edge<K, EV>, Vertex<K, LongValue>, Edge<K, Tuple2<EV, LongValue>>> {
- private Tuple2<EV, LongValue> valueAndDegree = new Tuple2<>();
+ public static class JoinEdgeWithVertexDegree<K, EV, D>
+ implements JoinFunction<Edge<K, EV>, Vertex<K, D>, Edge<K, Tuple2<EV, D>>> {
+ private Tuple2<EV, D> valueAndDegree = new Tuple2<>();
- private Edge<K, Tuple2<EV, LongValue>> output = new Edge<>(null, null, valueAndDegree);
+ private Edge<K, Tuple2<EV, D>> output = new Edge<>(null, null, valueAndDegree);
@Override
- public Edge<K, Tuple2<EV, LongValue>> join(Edge<K, EV> edge, Vertex<K, LongValue> vertex) throws Exception {
+ public Edge<K, Tuple2<EV, D>> join(Edge<K, EV> edge, Vertex<K, D> vertex) throws Exception {
output.f0 = edge.f0;
output.f1 = edge.f1;
valueAndDegree.f0 = edge.f2;
@@ -210,19 +149,20 @@ public class DegreeAnnotationFunctions {
*
* @param <K> ID type
* @param <EV> edge value type
+ * @param <D> degree type
*/
@ForwardedFieldsFirst("0; 1; 2.0; 2.1")
@ForwardedFieldsSecond("0; 1->2.2")
- public static final class JoinEdgeDegreeWithVertexDegree<K, EV>
- implements JoinFunction<Edge<K, Tuple2<EV, LongValue>>, Vertex<K, LongValue>, Edge<K, Tuple3<EV, LongValue, LongValue>>> {
- private Tuple3<EV, LongValue, LongValue> valueAndDegrees = new Tuple3<>();
+ public static class JoinEdgeDegreeWithVertexDegree<K, EV, D>
+ implements JoinFunction<Edge<K, Tuple2<EV, D>>, Vertex<K, D>, Edge<K, Tuple3<EV, D, D>>> {
+ private Tuple3<EV, D, D> valueAndDegrees = new Tuple3<>();
- private Edge<K, Tuple3<EV, LongValue, LongValue>> output = new Edge<>(null, null, valueAndDegrees);
+ private Edge<K, Tuple3<EV, D, D>> output = new Edge<>(null, null, valueAndDegrees);
@Override
- public Edge<K, Tuple3<EV, LongValue, LongValue>> join(Edge<K, Tuple2<EV, LongValue>> edge, Vertex<K, LongValue> vertex)
+ public Edge<K, Tuple3<EV, D, D>> join(Edge<K, Tuple2<EV, D>> edge, Vertex<K, D> vertex)
throws Exception {
- Tuple2<EV, LongValue> valueAndDegree = edge.f2;
+ Tuple2<EV, D> valueAndDegree = edge.f2;
output.f0 = edge.f0;
output.f1 = edge.f1;
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
new file mode 100644
index 0000000..5aa5ca4
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+
+/**
+ * Annotates edges of a directed graph with the degree, out-degree, and
+ * in-degree of both the source and target vertices.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class EdgeDegreesPair<K, VV, EV>
+implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>>> {
+
+ // Optional configuration
+ private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+ /**
+ * Override the operator parallelism.
+ *
+ * @param parallelism operator parallelism
+ * @return this
+ */
+ public EdgeDegreesPair<K, VV, EV> setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+
+ return this;
+ }
+
+ @Override
+ public DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> run(Graph<K, VV, EV> input)
+ throws Exception {
+ // s, t, d(s)
+ DataSet<Edge<K, Tuple2<EV, Degrees>>> edgeSourceDegrees = input
+ .run(new EdgeSourceDegrees<K, VV, EV>()
+ .setParallelism(parallelism));
+
+ // t, d(t)
+ DataSet<Vertex<K, Degrees>> vertexDegrees = input
+ .run(new VertexDegrees<K, VV, EV>()
+ .setParallelism(parallelism));
+
+ // s, t, (d(s), d(t))
+ return edgeSourceDegrees
+ .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
+ .where(1)
+ .equalTo(0)
+ .with(new JoinEdgeDegreeWithVertexDegree<K, EV, Degrees>())
+ .setParallelism(parallelism)
+ .name("Edge target degree");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
new file mode 100644
index 0000000..85ba0ed
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+
+/**
+ * Annotates edges of a directed graph with the degree, out-degree, and
+ * in-degree of the source vertex.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class EdgeSourceDegrees<K, VV, EV>
+implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> {
+
+ // Optional configuration
+ private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+ /**
+ * Override the operator parallelism.
+ *
+ * @param parallelism operator parallelism
+ * @return this
+ */
+ public EdgeSourceDegrees<K, VV, EV> setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+
+ return this;
+ }
+
+ @Override
+ public DataSet<Edge<K, Tuple2<EV, Degrees>>> run(Graph<K, VV, EV> input)
+ throws Exception {
+ // s, d(s)
+ DataSet<Vertex<K, Degrees>> vertexDegrees = input
+ .run(new VertexDegrees<K, VV, EV>()
+ .setParallelism(parallelism));
+
+ // s, t, d(s)
+ return input.getEdges()
+ .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
+ .where(0)
+ .equalTo(0)
+ .with(new JoinEdgeWithVertexDegree<K, EV, Degrees>())
+ .setParallelism(parallelism)
+ .name("Edge source degrees");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
new file mode 100644
index 0000000..6d72e44
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+
+/**
+ * Annotates edges of a directed graph with the degree, out-degree, and
+ * in-degree of the target vertex.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class EdgeTargetDegrees<K, VV, EV>
+implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> {
+
+ // Optional configuration
+ private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+ /**
+ * Override the operator parallelism.
+ *
+ * @param parallelism operator parallelism
+ * @return this
+ */
+ public EdgeTargetDegrees<K, VV, EV> setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+
+ return this;
+ }
+
+ @Override
+ public DataSet<Edge<K, Tuple2<EV, Degrees>>> run(Graph<K, VV, EV> input)
+ throws Exception {
+ // t, d(t)
+ DataSet<Vertex<K, Degrees>> vertexDegrees = input
+ .run(new VertexDegrees<K, VV, EV>()
+ .setParallelism(parallelism));
+
+ // s, t, d(t)
+ return input.getEdges()
+ .join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
+ .where(1)
+ .equalTo(0)
+ .with(new JoinEdgeWithVertexDegree<K, EV, Degrees>())
+ .setParallelism(parallelism)
+ .name("Edge target degrees");
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePair.java
deleted file mode 100644
index 3e2cae0..0000000
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePair.java
+++ /dev/null
@@ -1,107 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.asm.degree.annotate.directed;
-
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.GraphAlgorithm;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexDegreeWithVertexDegree;
-import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegrees;
-import org.apache.flink.types.LongValue;
-
-/**
- * Annotates vertices of a directed graph with both the out-degree and in-degree.
- *
- * @param <K> ID type
- * @param <VV> vertex value type
- * @param <EV> edge value type
- */
-public class VertexDegreePair<K, VV, EV>
-implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Tuple2<LongValue, LongValue>>>> {
-
- // Optional configuration
- private boolean includeZeroDegreeVertices = false;
-
- private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
-
- /**
- * By default only the edge set is processed for the computation of degree.
- * When this flag is set an additional join is performed against the vertex
- * set in order to output vertices with out- and in-degree of zero.
- *
- * @param includeZeroDegreeVertices whether to output vertices with out-
- * and in-degree of zero
- * @return this
- */
- public VertexDegreePair<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
- this.includeZeroDegreeVertices = includeZeroDegreeVertices;
-
- return this;
- }
-
- /**
- * Override the operator parallelism.
- *
- * @param parallelism operator parallelism
- * @return this
- */
- public VertexDegreePair<K, VV, EV> setParallelism(int parallelism) {
- this.parallelism = parallelism;
-
- return this;
- }
-
- @Override
- public DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> run(Graph<K, VV, EV> input)
- throws Exception {
- // s, deg(s)
- DataSet<Vertex<K, LongValue>> outDegree = input
- .run(new VertexOutDegree<K, VV, EV>()
- .setIncludeZeroDegreeVertices(false));
-
- // t, deg(t)
- DataSet<Vertex<K, LongValue>> inDegree = input
- .run(new VertexInDegree<K, VV, EV>()
- .setIncludeZeroDegreeVertices(false));
-
- DataSet<Vertex<K, Tuple2<LongValue, LongValue>>> degree = outDegree
- .fullOuterJoin(inDegree)
- .where(0)
- .equalTo(0)
- .with(new JoinVertexDegreeWithVertexDegree<K>())
- .setParallelism(parallelism)
- .name("Join out- and in-degree");
-
- if (includeZeroDegreeVertices) {
- degree = input
- .getVertices()
- .leftOuterJoin(degree)
- .where(0)
- .equalTo(0)
- .with(new JoinVertexWithVertexDegrees<K, VV>())
- .setParallelism(parallelism)
- .name("Join zero degree vertices");
- }
-
- return degree;
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
new file mode 100644
index 0000000..aab0eb6
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -0,0 +1,262 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.functions.GroupReduceFunction;
+import org.apache.flink.api.common.functions.JoinFunction;
+import org.apache.flink.api.common.functions.ReduceFunction;
+import org.apache.flink.api.common.operators.Order;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsSecond;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.EdgeOrder;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.graph.utils.Murmur3_32;
+import org.apache.flink.types.ByteValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+
+/**
+ * Annotates vertices of a directed graph with the degree, out-, and in-degree.
+ *
+ * @param <K> graph label type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class VertexDegrees<K, VV, EV>
+implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> {
+
+ // Optional configuration
+ private boolean includeZeroDegreeVertices = false;
+
+ private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+
+ /**
+ * By default only the edge set is processed for the computation of degree.
+ * When this flag is set an additional join is performed against the vertex
+ * set in order to output vertices with an in-degree of zero.
+ *
+ * @param includeZeroDegreeVertices whether to output vertices with an
+ * in-degree of zero
+ * @return this
+ */
+ public VertexDegrees<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
+ this.includeZeroDegreeVertices = includeZeroDegreeVertices;
+
+ return this;
+ }
+
+ /**
+ * Override the operator parallelism.
+ *
+ * @param parallelism operator parallelism
+ * @return this
+ */
+ public VertexDegrees<K, VV, EV> setParallelism(int parallelism) {
+ this.parallelism = parallelism;
+
+ return this;
+ }
+
+ @Override
+ public DataSet<Vertex<K, Degrees>> run(Graph<K, VV, EV> input)
+ throws Exception {
+ // s, t, bitmask
+ DataSet<Tuple3<K, K, ByteValue>> edgesWithOrder = input.getEdges()
+ .flatMap(new EmitAndFlipEdge<K, EV>())
+ .setParallelism(parallelism)
+ .name("Emit and flip edge")
+ .groupBy(0, 1)
+ .reduce(new ReduceBitmask<K>())
+ .setParallelism(parallelism)
+ .name("Reduce bitmask");
+
+ // s, d(s)
+ DataSet<Vertex<K, Degrees>> vertexDegrees = edgesWithOrder
+ .groupBy(0)
+ .sortGroup(1, Order.ASCENDING)
+ .reduceGroup(new DegreeCount<K>())
+ .setParallelism(parallelism)
+ .name("Degree count");
+
+ if (includeZeroDegreeVertices) {
+ vertexDegrees = input.getVertices()
+ .leftOuterJoin(vertexDegrees)
+ .where(0)
+ .equalTo(0)
+ .with(new JoinVertexWithVertexDegrees<K, VV>())
+ .setParallelism(parallelism)
+ .name("Join zero degree vertices");
+ }
+
+ return vertexDegrees;
+ }
+
+ /**
+ * Emit each vertex both forward and reversed with the associated bitmask.
+ *
+ * @param <T> ID type
+ * @param <TV> vertex value type
+ */
+ private static class EmitAndFlipEdge<T, TV>
+ implements FlatMapFunction<Edge<T, TV>, Tuple3<T, T, ByteValue>> {
+ private Tuple3<T, T, ByteValue> forward = new Tuple3<>(null, null, new ByteValue(EdgeOrder.FORWARD.getBitmask()));
+
+ private Tuple3<T, T, ByteValue> reverse = new Tuple3<>(null, null, new ByteValue(EdgeOrder.REVERSE.getBitmask()));
+
+ @Override
+ public void flatMap(Edge<T, TV> value, Collector<Tuple3<T, T, ByteValue>> out)
+ throws Exception {
+ forward.f0 = value.f0;
+ forward.f1 = value.f1;
+ out.collect(forward);
+
+ reverse.f0 = value.f1;
+ reverse.f1 = value.f0;
+ out.collect(reverse);
+ }
+ }
+
+ /**
+ * Combine mutual edges.
+ *
+ * @param <T> ID type
+ */
+ private static class ReduceBitmask<T>
+ implements ReduceFunction<Tuple3<T, T, ByteValue>> {
+ @Override
+ public Tuple3<T, T, ByteValue> reduce(Tuple3<T, T, ByteValue> left, Tuple3<T, T, ByteValue> right)
+ throws Exception {
+ left.f2.setValue((byte)(left.f2.getValue() | right.f2.getValue()));
+ return left;
+ }
+ }
+
+ /**
+ * Sum vertex degree by counting over mutual, out-, and in-edges.
+ *
+ * @param <T> ID type
+ */
+ private static class DegreeCount<T>
+ implements GroupReduceFunction<Tuple3<T, T, ByteValue>, Vertex<T, Degrees>> {
+ private Vertex<T, Degrees> output = new Vertex<>(null, new Degrees());
+
+ @Override
+ public void reduce(Iterable<Tuple3<T, T, ByteValue>> values, Collector<Vertex<T, Degrees>> out)
+ throws Exception {
+ long degree = 0;
+ long outDegree = 0;
+ long inDegree = 0;
+
+ for (Tuple3<T, T, ByteValue> edge : values) {
+ output.f0 = edge.f0;
+
+ byte bitmask = edge.f2.getValue();
+
+ degree++;
+
+ if (bitmask == EdgeOrder.FORWARD.getBitmask()) {
+ outDegree++;
+ } else if (bitmask == EdgeOrder.REVERSE.getBitmask()) {
+ inDegree++;
+ } else {
+ outDegree++;
+ inDegree++;
+ }
+ }
+
+ output.f1.getDegree().setValue(degree);
+ output.f1.getOutDegree().setValue(outDegree);
+ output.f1.getInDegree().setValue(inDegree);
+
+ out.collect(output);
+ }
+ }
+
+ /**
+ * Performs a left outer join to apply a zero count for vertices with
+ * out- and in-degree of zero.
+ *
+ * @param <T> ID type
+ * @param <TV> vertex value type
+ */
+ @ForwardedFieldsFirst("0")
+ @ForwardedFieldsSecond("0")
+ private static class JoinVertexWithVertexDegrees<T, TV>
+ implements JoinFunction<Vertex<T, TV>, Vertex<T, Degrees>, Vertex<T, Degrees>> {
+ private Vertex<T, Degrees> output = new Vertex<>(null, new Degrees());
+
+ @Override
+ public Vertex<T, Degrees> join(Vertex<T, TV> vertex, Vertex<T, Degrees> vertexDegree)
+ throws Exception {
+ if (vertexDegree == null) {
+ output.f0 = vertex.f0;
+ return output;
+ } else {
+ return vertexDegree;
+ }
+ }
+ }
+
+ /**
+ * Wraps the vertex degree, out-degree, and in-degree.
+ */
+ public static class Degrees
+ extends Tuple3<LongValue, LongValue, LongValue> {
+ private static final int HASH_SEED = 0x3a12fc31;
+
+ private Murmur3_32 hasher = new Murmur3_32(HASH_SEED);
+
+ public Degrees() {
+ this(new LongValue(), new LongValue(), new LongValue());
+ }
+
+ public Degrees(LongValue value0, LongValue value1, LongValue value2) {
+ super(value0, value1, value2);
+ }
+
+ public LongValue getDegree() {
+ return f0;
+ }
+
+ public LongValue getOutDegree() {
+ return f1;
+ }
+
+ public LongValue getInDegree() {
+ return f2;
+ }
+
+ @Override
+ public int hashCode() {
+ return hasher.reset()
+ .hash(f0.getValue())
+ .hash(f1.getValue())
+ .hash(f2.getValue())
+ .hash();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index 5825628..00acedb 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -39,7 +39,7 @@ public class VertexInDegree<K, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
// Optional configuration
- private boolean includeZeroDegreeVertices = true;
+ private boolean includeZeroDegreeVertices = false;
private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
@@ -80,7 +80,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
.setParallelism(parallelism)
.name("Map edge to target ID");
- // t, deg(t)
+ // t, d(t)
DataSet<Vertex<K, LongValue>> targetDegree = targetIds
.groupBy(0)
.reduce(new DegreeCount<K>())
@@ -88,8 +88,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
.name("Degree count");
if (includeZeroDegreeVertices) {
- targetDegree = input
- .getVertices()
+ targetDegree = input.getVertices()
.leftOuterJoin(targetDegree)
.where(0)
.equalTo(0)
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index 2676104..bcca19d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -39,7 +39,7 @@ public class VertexOutDegree<K, VV, EV>
implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
// Optional configuration
- private boolean includeZeroDegreeVertices = true;
+ private boolean includeZeroDegreeVertices = false;
private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
@@ -80,7 +80,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
.setParallelism(parallelism)
.name("Map edge to source ID");
- // s, deg(s)
+ // s, d(s)
DataSet<Vertex<K, LongValue>> sourceDegree = sourceIds
.groupBy(0)
.reduce(new DegreeCount<K>())
@@ -88,8 +88,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
.name("Degree count");
if (includeZeroDegreeVertices) {
- sourceDegree = input
- .getVertices()
+ sourceDegree = input.getVertices()
.leftOuterJoin(sourceDegree)
.where(0)
.equalTo(0)
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java
index f25fd90..a138b1a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/package-info.java
@@ -24,18 +24,25 @@
* equivalent to the out-degree.
*
* The undirected graph algorithms are:
- * {@code VertexDegree} annotates vertices as <v, deg(v)>
- * {@code EdgeSourceDegree} annotates edges as <s, t, deg(s)>
- * {@code EdgeTargetDegree} annotates edges as <s, t, deg(t)>
- * {@code EdgeDegreePair} annotates edges as <s, t, (deg(s), deg(t))>
+ * {@code VertexDegree} annotates vertices as <v, deg(v)>
+ * {@code EdgeSourceDegree} annotates edges as <s, t, (EV, deg(s))>
+ * {@code EdgeTargetDegree} annotates edges as <s, t, (EV, deg(t))>
+ * {@code EdgeDegreePair} annotates edges as <s, t, (EV, deg(s), deg(t))>
*
* The directed graph algorithms are:
- * {@code VertexOutDegree} annotates vertices as <v, out(v)>
- * {@code VertexInDegree} annotates vertices as <v, in(v)>
- * {@code VertexDegreePair} annotates vertices as <v, (out(v), in(v))>
+ * {@code VertexDegrees} annotates vertices as <v, (deg(v), out(v), in(v))>
+ * {@code VertexOutDegree} annotates vertices as <v, out(v)>
+ * {@code VertexInDegree} annotates vertices as <v, in(v)>
+ * {@code EdgeSourceDegrees} annotates edges as <s, t, (deg(s), out(s), in(s))>
+ * {@code EdgeTargetDegrees} annotates edges as <s, t, (deg(t), out(t), in(t))>
+ * {@code EdgeDegreesPair} annotates edges as <s, t, ((deg(s), out(s), in(s)), (deg(t), out(t), in(t)))>
*
- * A directed graph edge has four possible degrees: source out- and in-degree
- * and target out- and in-degree. This gives 2^4 - 1 = 15 ways to annotate
- * a directed edge.
+ * where:
+ * EV is the original edge value
+ * deg(x) is the number of vertex neighbors
+ * out(x) is the number of vertex neighbors connected by an out-edge
+ * in(x) is the number of vertex neighbors connected by an in-edge
+ *
+ * (out(x) + in(x)) / 2 <= deg(x) <= out(x) + in(x)
*/
package org.apache.flink.graph.asm.degree.annotate;
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
index 698a46a..8cc2e08 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
@@ -31,7 +31,8 @@ import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.Join
import org.apache.flink.types.LongValue;
/**
- * Annotates edges of an undirected graph with the degree of both the source and target degree ID.
+ * Annotates edges of an undirected graph with the degree of both the source
+ * and target degree vertices.
*
* @param <K> ID type
* @param <VV> vertex value type
@@ -75,7 +76,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, LongValue, LongV
@Override
public DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> run(Graph<K, VV, EV> input)
throws Exception {
- // s, t, deg(s)
+ // s, t, d(s)
DataSet<Edge<K, Tuple2<EV, LongValue>>> edgeSourceDegrees = input
.run(new EdgeSourceDegree<K, VV, EV>()
.setReduceOnTargetId(reduceOnTargetId)
@@ -87,12 +88,12 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, LongValue, LongV
.setReduceOnTargetId(reduceOnTargetId)
.setParallelism(parallelism));
- // s, t, (deg(s), deg(t))
+ // s, t, (d(s), d(t))
return edgeSourceDegrees
.join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
.where(1)
.equalTo(0)
- .with(new JoinEdgeDegreeWithVertexDegree<K,EV>())
+ .with(new JoinEdgeDegreeWithVertexDegree<K, EV, LongValue>())
.setParallelism(parallelism)
.name("Edge target degree");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
index 6f64bc1..b9b59c5 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
@@ -30,7 +30,7 @@ import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.Join
import org.apache.flink.types.LongValue;
/**
- * Annotates edges of an undirected graph with degree of the source ID.
+ * Annotates edges of an undirected graph with degree of the source vertex.
*
* @param <K> ID type
* @param <VV> vertex value type
@@ -85,7 +85,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
.join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
.where(0)
.equalTo(0)
- .with(new JoinEdgeWithVertexDegree<K, EV>())
+ .with(new JoinEdgeWithVertexDegree<K, EV, LongValue>())
.setParallelism(parallelism)
.name("Edge source degree");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
index e9505bc..eabfee7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
@@ -30,7 +30,7 @@ import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.Join
import org.apache.flink.types.LongValue;
/**
- * Annotates edges of an undirected graph with degree of the target ID.
+ * Annotates edges of an undirected graph with degree of the target vertex.
*
* @param <K> ID type
* @param <VV> vertex value type
@@ -85,7 +85,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
.join(vertexDegrees, JoinHint.REPARTITION_HASH_SECOND)
.where(1)
.equalTo(0)
- .with(new JoinEdgeWithVertexDegree<K, EV>())
+ .with(new JoinEdgeWithVertexDegree<K, EV, LongValue>())
.setParallelism(parallelism)
.name("Edge target degree");
}
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index 518afaa..61a5e82 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -111,8 +111,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
.name("Degree count");
if (includeZeroDegreeVertices) {
- degree = input
- .getVertices()
+ degree = input.getVertices()
.leftOuterJoin(degree)
.where(0)
.equalTo(0)
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
new file mode 100644
index 0000000..3fcb9dd
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EdgeDegreesPairTest
+extends AsmTestBase {
+
+ @Test
+ public void testWithSimpleGraph()
+ throws Exception {
+ String expectedResult =
+ "(0,1,((null),(2,2,0),(3,2,1)))\n" +
+ "(0,2,((null),(2,2,0),(3,1,2)))\n" +
+ "(1,2,((null),(3,2,1),(3,1,2)))\n" +
+ "(1,3,((null),(3,2,1),(4,2,2)))\n" +
+ "(2,3,((null),(3,1,2),(4,2,2)))\n" +
+ "(3,4,((null),(4,2,2),(1,0,1)))\n" +
+ "(3,5,((null),(4,2,2),(1,0,1)))";
+
+ DataSet<Edge<IntValue, Tuple3<NullValue, Degrees, Degrees>>> degrees = directedSimpleGraph
+ .run(new EdgeDegreesPair<IntValue, NullValue, NullValue>());
+
+ TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
+ }
+
+ @Test
+ public void testWithRMatGraph()
+ throws Exception {
+ ChecksumHashCode degreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+ .run(new EdgeDegreesPair<LongValue, NullValue, NullValue>()));
+
+ assertEquals(16384, degreeChecksum.getCount());
+ assertEquals(0x00001f68dfabd17cL, degreeChecksum.getChecksum());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
new file mode 100644
index 0000000..2b22eea
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EdgeSourceDegreesTest
+extends AsmTestBase {
+
+ @Test
+ public void testWithSimpleGraph()
+ throws Exception {
+ String expectedResult =
+ "(0,1,((null),(2,2,0)))\n" +
+ "(0,2,((null),(2,2,0)))\n" +
+ "(1,2,((null),(3,2,1)))\n" +
+ "(1,3,((null),(3,2,1)))\n" +
+ "(2,3,((null),(3,1,2)))\n" +
+ "(3,4,((null),(4,2,2)))\n" +
+ "(3,5,((null),(4,2,2)))";
+
+ DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> degrees = directedSimpleGraph
+ .run(new EdgeSourceDegrees<IntValue, NullValue, NullValue>());
+
+ TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
+ }
+
+ @Test
+ public void testWithRMatGraph()
+ throws Exception {
+ ChecksumHashCode sourceDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+ .run(new EdgeSourceDegrees<LongValue, NullValue, NullValue>()));
+
+ assertEquals(16384, sourceDegreeChecksum.getCount());
+ assertEquals(0x00001ec53bd55136L, sourceDegreeChecksum.getChecksum());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
new file mode 100644
index 0000000..6840dc5
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class EdgeTargetDegreesTest
+extends AsmTestBase {
+
+ @Test
+ public void testWithSimpleGraph()
+ throws Exception {
+ String expectedResult =
+ "(0,1,((null),(3,2,1)))\n" +
+ "(0,2,((null),(3,1,2)))\n" +
+ "(1,2,((null),(3,1,2)))\n" +
+ "(1,3,((null),(4,2,2)))\n" +
+ "(2,3,((null),(4,2,2)))\n" +
+ "(3,4,((null),(1,0,1)))\n" +
+ "(3,5,((null),(1,0,1)))";
+
+ DataSet<Edge<IntValue, Tuple2<NullValue, Degrees>>> degrees = directedSimpleGraph
+ .run(new EdgeTargetDegrees<IntValue, NullValue, NullValue>());
+
+ TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
+ }
+
+ @Test
+ public void testWithRMatGraph()
+ throws Exception {
+ ChecksumHashCode targetDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+ .run(new EdgeTargetDegrees<LongValue, NullValue, NullValue>()));
+
+ assertEquals(16384, targetDegreeChecksum.getCount());
+ assertEquals(0x00001f2867ba8b4fL, targetDegreeChecksum.getChecksum());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePairTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePairTest.java
deleted file mode 100644
index 9400532..0000000
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreePairTest.java
+++ /dev/null
@@ -1,87 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.graph.asm.degree.annotate.directed;
-
-import org.apache.flink.api.java.DataSet;
-import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.api.java.utils.DataSetUtils;
-import org.apache.flink.graph.Vertex;
-import org.apache.flink.graph.asm.AsmTestBase;
-import org.apache.flink.test.util.TestBaseUtils;
-import org.apache.flink.types.IntValue;
-import org.apache.flink.types.LongValue;
-import org.apache.flink.types.NullValue;
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class VertexDegreePairTest
-extends AsmTestBase {
-
- @Test
- public void testWithSimpleGraph()
- throws Exception {
- DataSet<Vertex<IntValue, Tuple2<LongValue, LongValue>>> vertexDegrees = directedSimpleGraph
- .run(new VertexDegreePair<IntValue, NullValue, NullValue>());
-
- String expectedResult =
- "(0,(2,0))\n" +
- "(1,(2,1))\n" +
- "(2,(1,2))\n" +
- "(3,(2,2))\n" +
- "(4,(0,1))\n" +
- "(5,(0,1))";
-
- TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
- }
-
- @Test
- public void testWithEmptyGraph()
- throws Exception {
- DataSet<Vertex<LongValue, Tuple2<LongValue, LongValue>>> vertexDegrees;
-
- vertexDegrees = emptyGraph
- .run(new VertexDegreePair<LongValue, NullValue, NullValue>()
- .setIncludeZeroDegreeVertices(false));
-
- assertEquals(0, vertexDegrees.collect().size());
-
- vertexDegrees = emptyGraph
- .run(new VertexDegreePair<LongValue, NullValue, NullValue>()
- .setIncludeZeroDegreeVertices(true));
-
- String expectedResult =
- "(0,(0,0))\n" +
- "(1,(0,0))\n" +
- "(2,(0,0))";
-
- TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
- }
-
- @Test
- public void testWithRMatGraph()
- throws Exception {
- ChecksumHashCode degreePairChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
- .run(new VertexDegreePair<LongValue, NullValue, NullValue>()));
-
- assertEquals(902, degreePairChecksum.getCount());
- assertEquals(0x0000000000fc025aL, degreePairChecksum.getChecksum());
- }
-}
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
new file mode 100644
index 0000000..a0697a2
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.annotate.directed;
+
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.api.java.utils.DataSetUtils;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class VertexDegreesTest
+extends AsmTestBase {
+
+ @Test
+ public void testWithSimpleDirectedGraph()
+ throws Exception {
+ DataSet<Vertex<IntValue, Degrees>> vertexDegrees = directedSimpleGraph
+ .run(new VertexDegrees<IntValue, NullValue, NullValue>());
+
+ String expectedResult =
+ "(0,(2,2,0))\n" +
+ "(1,(3,2,1))\n" +
+ "(2,(3,1,2))\n" +
+ "(3,(4,2,2))\n" +
+ "(4,(1,0,1))\n" +
+ "(5,(1,0,1))";
+
+ TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+ }
+
+ @Test
+ public void testWithSimpleUndirectedGraph()
+ throws Exception {
+ DataSet<Vertex<IntValue, Degrees>> vertexDegrees = undirectedSimpleGraph
+ .run(new VertexDegrees<IntValue, NullValue, NullValue>());
+
+ String expectedResult =
+ "(0,(2,2,2))\n" +
+ "(1,(3,3,3))\n" +
+ "(2,(3,3,3))\n" +
+ "(3,(4,4,4))\n" +
+ "(4,(1,1,1))\n" +
+ "(5,(1,1,1))";
+
+ TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+ }
+
+ @Test
+ public void testWithEmptyGraph()
+ throws Exception {
+ DataSet<Vertex<LongValue, Degrees>> vertexDegrees;
+
+ vertexDegrees = emptyGraph
+ .run(new VertexDegrees<LongValue, NullValue, NullValue>()
+ .setIncludeZeroDegreeVertices(false));
+
+ assertEquals(0, vertexDegrees.collect().size());
+
+ vertexDegrees = emptyGraph
+ .run(new VertexDegrees<LongValue, NullValue, NullValue>()
+ .setIncludeZeroDegreeVertices(true));
+
+ String expectedResult =
+ "(0,(0,0,0))\n" +
+ "(1,(0,0,0))\n" +
+ "(2,(0,0,0))";
+
+ TestBaseUtils.compareResultAsText(vertexDegrees.collect(), expectedResult);
+ }
+
+ @Test
+ public void testWithRMatGraph()
+ throws Exception {
+ ChecksumHashCode degreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+ .run(new VertexDegrees<LongValue, NullValue, NullValue>()));
+
+ assertEquals(902, degreeChecksum.getCount());
+ assertEquals(0x0000015384f40cb6L, degreeChecksum.getChecksum());
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
index 577e675..0fa0fe5 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.graph.asm.degree.annotate.directed;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.Utils.ChecksumHashCode;
-import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.Vertex;
import org.apache.flink.graph.asm.AsmTestBase;
@@ -39,7 +38,8 @@ extends AsmTestBase {
public void testWithSimpleGraph()
throws Exception {
DataSet<Vertex<IntValue, LongValue>> vertexDegrees = directedSimpleGraph
- .run(new VertexInDegree<IntValue, NullValue, NullValue>());
+ .run(new VertexInDegree<IntValue, NullValue, NullValue>()
+ .setIncludeZeroDegreeVertices(true));
String expectedResult =
"(0,0)\n" +
@@ -79,7 +79,8 @@ extends AsmTestBase {
public void testWithRMatGraph()
throws Exception {
ChecksumHashCode inDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
- .run(new VertexInDegree<LongValue, NullValue, NullValue>()));
+ .run(new VertexInDegree<LongValue, NullValue, NullValue>()
+ .setIncludeZeroDegreeVertices(true)));
assertEquals(902, inDegreeChecksum.getCount());
assertEquals(0x0000000000e1e99cL, inDegreeChecksum.getChecksum());
http://git-wip-us.apache.org/repos/asf/flink/blob/65545c2e/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
index b5e9ce8..f7f3d48 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
@@ -38,7 +38,8 @@ extends AsmTestBase {
public void testWithSimpleGraph()
throws Exception {
DataSet<Vertex<IntValue, LongValue>> vertexDegrees = directedSimpleGraph
- .run(new VertexOutDegree<IntValue, NullValue, NullValue>());
+ .run(new VertexOutDegree<IntValue, NullValue, NullValue>()
+ .setIncludeZeroDegreeVertices(true));
String expectedResult =
"(0,2)\n" +
@@ -78,7 +79,8 @@ extends AsmTestBase {
public void testWithRMatGraph()
throws Exception {
ChecksumHashCode outDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
- .run(new VertexOutDegree<LongValue, NullValue, NullValue>()));
+ .run(new VertexOutDegree<LongValue, NullValue, NullValue>()
+ .setIncludeZeroDegreeVertices(true)));
assertEquals(902, outDegreeChecksum.getCount());
assertEquals(0x0000000000e1e99cL, outDegreeChecksum.getChecksum());