You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/09/15 13:58:34 UTC
[3/4] flink git commit: [FLINK-7273] [gelly] Gelly tests with empty
graphs
[FLINK-7273] [gelly] Gelly tests with empty graphs
There exist some tests with empty graphs but the `EmptyGraph` in
`AsmTestBase` contained vertices but no edges. Add a new `EmptyGraph`
without vertices and test both empty graphs for each algorithm.
EmptyGraph now generates the proper TypeInformation (for Edge<> not
Tuple3) which had prevented adding edges due to a union incompatibility.
GraphGeneratorUtils#vertexSet now uses a hash-combine for distinct.
`PageRank` optionally includes zero-degree vertices in the results
(at a performance cost).
This closes #4405
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9437a0ff
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9437a0ff
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9437a0ff
Branch: refs/heads/master
Commit: 9437a0ffc04318f6a1a2d19c59f2ae6651b26507
Parents: 6612c0e
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Jul 26 06:23:52 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Thu Sep 14 23:25:25 2017 -0400
----------------------------------------------------------------------
docs/dev/libs/gelly/library_methods.md | 1 +
.../apache/flink/graph/drivers/PageRank.java | 6 +-
.../flink/graph/drivers/input/GridGraph.java | 1 -
.../flink/graph/generator/CirculantGraph.java | 2 +-
.../flink/graph/generator/CompleteGraph.java | 2 +-
.../flink/graph/generator/CycleGraph.java | 2 +-
.../apache/flink/graph/generator/EchoGraph.java | 4 +-
.../flink/graph/generator/EmptyGraph.java | 14 ++-
.../graph/generator/GraphGeneratorUtils.java | 46 ++++++---
.../apache/flink/graph/generator/GridGraph.java | 2 +-
.../flink/graph/generator/HypercubeGraph.java | 4 +-
.../apache/flink/graph/generator/PathGraph.java | 2 +-
.../graph/generator/SingletonEdgeGraph.java | 6 +-
.../apache/flink/graph/generator/StarGraph.java | 8 +-
.../directed/AverageClusteringCoefficient.java | 5 +
.../directed/GlobalClusteringCoefficient.java | 5 +
.../clustering/directed/TriadicCensus.java | 5 +
.../AverageClusteringCoefficient.java | 5 +
.../undirected/GlobalClusteringCoefficient.java | 5 +
.../clustering/undirected/TriadicCensus.java | 5 +
.../flink/graph/library/linkanalysis/HITS.java | 12 ++-
.../graph/library/linkanalysis/PageRank.java | 50 ++++++++-
.../library/metric/directed/EdgeMetrics.java | 5 +
.../library/metric/directed/VertexMetrics.java | 5 +
.../library/metric/undirected/EdgeMetrics.java | 5 +
.../metric/undirected/VertexMetrics.java | 5 +
.../graph/library/similarity/AdamicAdar.java | 4 +-
.../org/apache/flink/graph/asm/AsmTestBase.java | 25 ++++-
.../graph/asm/dataset/ChecksumHashCodeTest.java | 19 +++-
.../flink/graph/asm/dataset/CollectTest.java | 19 +++-
.../flink/graph/asm/dataset/CountTest.java | 18 +++-
.../annotate/directed/EdgeDegreesPairTest.java | 25 +++--
.../directed/EdgeSourceDegreesTest.java | 25 +++--
.../directed/EdgeTargetDegreesTest.java | 25 +++--
.../annotate/directed/VertexDegreesTest.java | 40 ++++---
.../annotate/directed/VertexInDegreeTest.java | 52 +++++++---
.../annotate/directed/VertexOutDegreeTest.java | 52 +++++++---
.../annotate/undirected/EdgeDegreePairTest.java | 45 ++++++--
.../undirected/EdgeSourceDegreeTest.java | 45 ++++++--
.../undirected/EdgeTargetDegreeTest.java | 45 ++++++--
.../annotate/undirected/VertexDegreeTest.java | 45 +++++---
.../filter/undirected/MaximumDegreeTest.java | 28 +++--
.../graph/asm/simple/directed/SimplifyTest.java | 3 +-
.../asm/simple/undirected/SimplifyTest.java | 6 +-
.../graph/asm/translate/TranslateTest.java | 9 +-
.../graph/generator/CirculantGraphTest.java | 12 +--
.../graph/generator/CompleteGraphTest.java | 12 +--
.../flink/graph/generator/CycleGraphTest.java | 12 +--
.../flink/graph/generator/EchoGraphTest.java | 24 ++---
.../flink/graph/generator/EmptyGraphTest.java | 12 +--
.../flink/graph/generator/GridGraphTest.java | 12 +--
.../graph/generator/HypercubeGraphTest.java | 12 +--
.../flink/graph/generator/PathGraphTest.java | 12 +--
.../flink/graph/generator/RMatGraphTest.java | 9 +-
.../graph/generator/SingletonEdgeGraphTest.java | 12 +--
.../flink/graph/generator/StarGraphTest.java | 12 +--
.../apache/flink/graph/generator/TestUtils.java | 9 +-
.../AverageClusteringCoefficientTest.java | 74 ++++++-------
.../GlobalClusteringCoefficientTest.java | 73 ++++++-------
.../LocalClusteringCoefficientTest.java | 60 +++++++----
.../clustering/directed/TriadicCensusTest.java | 34 +++---
.../directed/TriangleListingTest.java | 28 +++--
.../AverageClusteringCoefficientTest.java | 74 ++++++-------
.../GlobalClusteringCoefficientTest.java | 73 ++++++-------
.../LocalClusteringCoefficientTest.java | 60 +++++++----
.../undirected/TriadicCensusTest.java | 28 +++--
.../undirected/TriangleListingTest.java | 31 ++++--
.../graph/library/linkanalysis/HITSTest.java | 103 +++++++++++--------
.../library/linkanalysis/PageRankTest.java | 97 ++++++++++-------
.../library/metric/ChecksumHashCodeTest.java | 27 ++++-
.../metric/directed/EdgeMetricsTest.java | 70 ++++++-------
.../metric/directed/VertexMetricsTest.java | 95 ++++++++---------
.../metric/undirected/EdgeMetricsTest.java | 70 ++++++-------
.../metric/undirected/VertexMetricsTest.java | 95 ++++++++---------
.../library/similarity/AdamicAdarTest.java | 76 +++++++++++---
.../library/similarity/JaccardIndexTest.java | 89 ++++++++++++----
76 files changed, 1320 insertions(+), 829 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/docs/dev/libs/gelly/library_methods.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/library_methods.md b/docs/dev/libs/gelly/library_methods.md
index de53aaf..93a2c5d 100644
--- a/docs/dev/libs/gelly/library_methods.md
+++ b/docs/dev/libs/gelly/library_methods.md
@@ -330,6 +330,7 @@ The algorithm takes a simple directed graph as input and outputs a `DataSet` of
hub score, and authority score. Termination is configured by the number of iterations and/or a convergence threshold on
the iteration sum of the change in scores over all vertices.
+* `setIncludeZeroDegreeVertices`: whether to include zero-degree vertices in the iterative computation
* `setParallelism`: override the operator parallelism
### PageRank
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
index 299aeed..b9997e4 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
@@ -20,13 +20,14 @@ package org.apache.flink.graph.drivers;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.drivers.parameter.BooleanParameter;
import org.apache.flink.graph.drivers.parameter.DoubleParameter;
import org.apache.flink.graph.drivers.parameter.IterationConvergence;
import org.apache.commons.lang3.text.StrBuilder;
/**
- * @see org.apache.flink.graph.library.linkanalysis.PageRank
+ * Driver for {@link org.apache.flink.graph.library.linkanalysis.PageRank}.
*/
public class PageRank<K, VV, EV>
extends DriverBase<K, VV, EV> {
@@ -40,6 +41,8 @@ extends DriverBase<K, VV, EV> {
private IterationConvergence iterationConvergence = new IterationConvergence(this, DEFAULT_ITERATIONS);
+ private BooleanParameter includeZeroDegreeVertices = new BooleanParameter(this, "__include_zero_degree_vertices");
+
@Override
public String getShortDescription() {
return "score vertices by the number and quality of incoming links";
@@ -63,6 +66,7 @@ extends DriverBase<K, VV, EV> {
dampingFactor.getValue(),
iterationConvergence.getValue().iterations,
iterationConvergence.getValue().convergenceThreshold)
+ .setIncludeZeroDegreeVertices(includeZeroDegreeVertices.getValue())
.setParallelism(parallelism.getValue().intValue()));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
index 1421b1d..fdb144f 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
@@ -76,7 +76,6 @@ extends GeneratedGraph {
@Override
protected long vertexCount() {
- // in Java 8 use Math.multiplyExact(long, long)
BigInteger vertexCount = BigInteger.ONE;
for (Dimension dimension : dimensions) {
vertexCount = vertexCount.multiply(BigInteger.valueOf(dimension.size));
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
index 1dc9e66..0cb3edd 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CirculantGraph.java
@@ -51,7 +51,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
private final ExecutionEnvironment env;
// Required configuration
- private long vertexCount;
+ private final long vertexCount;
private List<OffsetRange> offsetRanges = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
index bf7dedf..46d8e67 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CompleteGraph.java
@@ -36,7 +36,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
private final ExecutionEnvironment env;
// Required configuration
- private long vertexCount;
+ private final long vertexCount;
/**
* An undirected {@link Graph} connecting every distinct pair of vertices.
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
index 5b61fa8..5dad4c8 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/CycleGraph.java
@@ -36,7 +36,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
private final ExecutionEnvironment env;
// Required configuration
- private long vertexCount;
+ private final long vertexCount;
/**
* An undirected {@link Graph} with {@code n} vertices where each vertex
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
index d834df1..4baeb25 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EchoGraph.java
@@ -46,9 +46,9 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
private final ExecutionEnvironment env;
// Required configuration
- private long vertexCount;
+ private final long vertexCount;
- private long vertexDegree;
+ private final long vertexDegree;
/**
* An undirected {@link Graph} whose vertices have the same degree.
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
index 466e2d3..30d0d2f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/EmptyGraph.java
@@ -18,12 +18,11 @@
package org.apache.flink.graph.generator;
+import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.operators.DataSource;
-import org.apache.flink.api.java.typeutils.TupleTypeInfo;
-import org.apache.flink.api.java.typeutils.ValueTypeInfo;
import org.apache.flink.graph.Edge;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.Vertex;
@@ -39,13 +38,13 @@ import java.util.Collections;
public class EmptyGraph
extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
- public static final int MINIMUM_VERTEX_COUNT = 1;
+ public static final int MINIMUM_VERTEX_COUNT = 0;
// Required to create the DataSource
private final ExecutionEnvironment env;
// Required configuration
- private long vertexCount;
+ private final long vertexCount;
/**
* The {@link Graph} containing no edges.
@@ -63,15 +62,14 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
@Override
public Graph<LongValue, NullValue, NullValue> generate() {
+ Preconditions.checkState(vertexCount >= 0);
+
// Vertices
DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
// Edges
- TypeInformation<Edge<LongValue, NullValue>> typeInformation = new TupleTypeInfo<>(
- ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.LONG_VALUE_TYPE_INFO, ValueTypeInfo.NULL_VALUE_TYPE_INFO);
-
DataSource<Edge<LongValue, NullValue>> edges = env
- .fromCollection(Collections.<Edge<LongValue, NullValue>>emptyList(), typeInformation)
+ .fromCollection(Collections.<Edge<LongValue, NullValue>>emptyList(), TypeInformation.of(new TypeHint<Edge<LongValue, NullValue>>(){}))
.setParallelism(parallelism)
.name("Empty edge set");
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
index d5a70f3..61a27c4 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorUtils.java
@@ -20,6 +20,9 @@ package org.apache.flink.graph.generator;
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.operators.base.ReduceOperatorBase.CombineHint;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
@@ -31,6 +34,9 @@ import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.apache.flink.util.Collector;
import org.apache.flink.util.LongValueSequenceIterator;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collections;
/**
* Utilities for graph generators.
@@ -45,26 +51,34 @@ public class GraphGeneratorUtils {
* @param env the Flink execution environment.
* @param parallelism operator parallelism
* @param vertexCount number of sequential vertex labels
- * @return {@link DataSet} of sequentially labeled {@link Vertex Vertices}
+ * @return {@link DataSet} of sequentially labeled {@link Vertex vertices}
*/
public static DataSet<Vertex<LongValue, NullValue>> vertexSequence(ExecutionEnvironment env, int parallelism, long vertexCount) {
- LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount - 1);
-
- DataSource<LongValue> vertexLabels = env
- .fromParallelCollection(iterator, LongValue.class)
- .setParallelism(parallelism)
- .name("Vertex iterators");
-
- return vertexLabels
- .map(new CreateVertex())
- .setParallelism(parallelism)
- .name("Vertex sequence");
+ Preconditions.checkArgument(vertexCount >= 0, "Vertex count must be non-negative");
+
+ if (vertexCount == 0) {
+ return env
+ .fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Vertex<LongValue, NullValue>>(){}))
+ .setParallelism(parallelism)
+ .name("Empty vertex set");
+ } else {
+ LongValueSequenceIterator iterator = new LongValueSequenceIterator(0, vertexCount - 1);
+
+ DataSource<LongValue> vertexLabels = env
+ .fromParallelCollection(iterator, LongValue.class)
+ .setParallelism(parallelism)
+ .name("Vertex indices");
+
+ return vertexLabels
+ .map(new CreateVertex())
+ .setParallelism(parallelism)
+ .name("Vertex sequence");
+ }
}
@ForwardedFields("*->f0")
private static class CreateVertex
implements MapFunction<LongValue, Vertex<LongValue, NullValue>> {
-
private Vertex<LongValue, NullValue> vertex = new Vertex<>(null, NullValue.getInstance());
@Override
@@ -79,13 +93,13 @@ public class GraphGeneratorUtils {
// --------------------------------------------------------------------------------------------
/**
- * Generates {@link Vertex Vertices} present in the given set of {@link Edge}s.
+ * Generates {@link Vertex vertices} present in the given set of {@link Edge}s.
*
* @param edges source {@link DataSet} of {@link Edge}s
* @param parallelism operator parallelism
* @param <K> label type
* @param <EV> edge value type
- * @return {@link DataSet} of discovered {@link Vertex Vertices}
+ * @return {@link DataSet} of discovered {@link Vertex vertices}
*
* @see Graph#fromDataSet(DataSet, DataSet, ExecutionEnvironment)
*/
@@ -97,6 +111,7 @@ public class GraphGeneratorUtils {
return vertexSet
.distinct()
+ .setCombineHint(CombineHint.HASH)
.setParallelism(parallelism)
.name("Emit vertex labels");
}
@@ -106,7 +121,6 @@ public class GraphGeneratorUtils {
*/
private static final class EmitSrcAndTarget<K, EV>
implements FlatMapFunction<Edge<K, EV>, Vertex<K, NullValue>> {
-
private Vertex<K, NullValue> output = new Vertex<>(null, new NullValue());
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
index cae2bc4..1d91b53 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GridGraph.java
@@ -71,7 +71,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
public GridGraph addDimension(long size, boolean wrapEndpoints) {
Preconditions.checkArgument(size >= 2, "Dimension size must be at least 2");
- vertexCount *= size;
+ vertexCount = Math.multiplyExact(vertexCount, size);
// prevent duplicate edges
if (size == 2) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
index daaaf53..0dc95e7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/HypercubeGraph.java
@@ -36,7 +36,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
private final ExecutionEnvironment env;
// Required configuration
- private long dimensions;
+ private final long dimensions;
/**
* An undirected {@code Graph} where edges form an n-dimensional hypercube.
@@ -56,6 +56,8 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
@Override
public Graph<LongValue, NullValue, NullValue> generate() {
+ Preconditions.checkState(dimensions > 0);
+
GridGraph graph = new GridGraph(env);
for (int i = 0; i < dimensions; i++) {
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
index e61fcd8..bae8633 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/PathGraph.java
@@ -36,7 +36,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
private final ExecutionEnvironment env;
// Required configuration
- private long vertexCount;
+ private final long vertexCount;
/**
* An undirected {@link Graph} with {@code n} vertices where each vertex
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
index 159e55d..333c051 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/SingletonEdgeGraph.java
@@ -42,7 +42,7 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
private final ExecutionEnvironment env;
// Required configuration
- private long vertexPairCount;
+ private final long vertexPairCount;
/**
* An undirected {@link Graph} containing one or more isolated two-paths.
@@ -62,8 +62,10 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
@Override
public Graph<LongValue, NullValue, NullValue> generate() {
+ Preconditions.checkState(vertexPairCount > 0);
+
// Vertices
- long vertexCount = 2 * this.vertexPairCount;
+ long vertexCount = 2 * vertexPairCount;
DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
index 7133320..5cbb256 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/StarGraph.java
@@ -43,11 +43,11 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
private final ExecutionEnvironment env;
// Required configuration
- private long vertexCount;
+ private final long vertexCount;
/**
- * An undirected {@Graph} with {@code n} vertices where the single central
- * node has degree {@code n-1}, connecting to the other {@code n-1}
+ * An undirected {@link Graph} with {@code n} vertices where the single
+ * central node has degree {@code n-1}, connecting to the other {@code n-1}
* vertices which have degree {@code 1}.
*
* @param env the Flink execution environment
@@ -63,6 +63,8 @@ extends GraphGeneratorBase<LongValue, NullValue, NullValue> {
@Override
public Graph<LongValue, NullValue, NullValue> generate() {
+ Preconditions.checkState(vertexCount >= 2);
+
// Vertices
DataSet<Vertex<LongValue, NullValue>> vertices = GraphGeneratorUtils.vertexSequence(env, parallelism, vertexCount);
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
index 2ff5dc2..1d3b7c6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
@@ -150,6 +150,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
}
@Override
+ public String toString() {
+ return toPrintableString();
+ }
+
+ @Override
public String toPrintableString() {
return "vertex count: " + vertexCount
+ ", average clustering coefficient: " + averageLocalClusteringCoefficient;
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
index a6b0baa..ae15d40 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
@@ -137,6 +137,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
}
@Override
+ public String toString() {
+ return toPrintableString();
+ }
+
+ @Override
public String toPrintableString() {
return "triplet count: " + tripletCount
+ ", triangle count: " + triangleCount
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
index 93eadc5..f99df1c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
@@ -504,6 +504,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
}
@Override
+ public String toString() {
+ return toPrintableString();
+ }
+
+ @Override
public String toPrintableString() {
NumberFormat nf = NumberFormat.getInstance();
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
index c0ddb05..2885d6e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
@@ -150,6 +150,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
}
@Override
+ public String toString() {
+ return toPrintableString();
+ }
+
+ @Override
public String toPrintableString() {
return "vertex count: " + vertexCount
+ ", average clustering coefficient: " + averageLocalClusteringCoefficient;
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
index ef212ae..1a7314e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
@@ -136,6 +136,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
}
@Override
+ public String toString() {
+ return toPrintableString();
+ }
+
+ @Override
public String toPrintableString() {
return "triplet count: " + tripletCount
+ ", triangle count: " + triangleCount
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
index f440098..24dcade 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
@@ -198,6 +198,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
}
@Override
+ public String toString() {
+ return toPrintableString();
+ }
+
+ @Override
public String toPrintableString() {
NumberFormat nf = NumberFormat.getInstance();
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
index e59240b..9ec1eea 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
@@ -48,6 +48,7 @@ import org.apache.flink.util.Collector;
import org.apache.flink.util.Preconditions;
import java.util.Collection;
+import java.util.Iterator;
/**
* Hyperlink-Induced Topic Search computes two interdependent scores for every
@@ -387,12 +388,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
public void open(Configuration parameters) throws Exception {
super.open(parameters);
- Collection<DoubleValue> var;
- var = getRuntimeContext().getBroadcastVariable(HUBBINESS_SUM_SQUARED);
- hubbinessRootSumSquared = Math.sqrt(var.iterator().next().getValue());
+ Collection<DoubleValue> hubbinessSumSquared = getRuntimeContext().getBroadcastVariable(HUBBINESS_SUM_SQUARED);
+ Iterator<DoubleValue> hubbinessSumSquaredIterator = hubbinessSumSquared.iterator();
+ this.hubbinessRootSumSquared = hubbinessSumSquaredIterator.hasNext() ? Math.sqrt(hubbinessSumSquaredIterator.next().getValue()) : Double.NaN;
- var = getRuntimeContext().getBroadcastVariable(AUTHORITY_SUM_SQUARED);
- authorityRootSumSquared = Math.sqrt(var.iterator().next().getValue());
+ Collection<DoubleValue> authoritySumSquared = getRuntimeContext().getBroadcastVariable(AUTHORITY_SUM_SQUARED);
+ Iterator<DoubleValue> authoritySumSquaredIterator = authoritySumSquared.iterator();
+ authorityRootSumSquared = authoritySumSquaredIterator.hasNext() ? Math.sqrt(authoritySumSquaredIterator.next().getValue()) : Double.NaN;
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
index 71c37aa..d259fac 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
@@ -84,6 +84,9 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
private double convergenceThreshold;
+ // Optional configuration
+ private boolean includeZeroDegreeVertices = false;
+
/**
* PageRank with a fixed number of iterations.
*
@@ -126,6 +129,42 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
this.convergenceThreshold = convergenceThreshold;
}
+ /**
+ * This PageRank implementation properly handles both source and sink
+ * vertices which have, respectively, only outgoing and incoming edges.
+ *
+ * <p>Setting this flag includes "zero-degree" vertices in the PageRank
+ * computation and result. These vertices are handled the same as other
+ * "source" vertices (with a consistent score of
+ * <code>(1 - damping factor) / number of vertices</code>) but only
+ * affect the scores of other vertices indirectly through the taking of
+ * this proportional portion of the "random jump" score.
+ *
+ * <p>The cost to include zero-degree vertices is a reduce for uniqueness
+ * on the vertex set followed by an outer join on the vertex degree
+ * DataSet.
+ *
+ * @param includeZeroDegreeVertices whether to include zero-degree vertices in the iterative computation
+ * @return this
+ */
+ public PageRank<K, VV, EV> setIncludeZeroDegreeVertices(boolean includeZeroDegreeVertices) {
+ this.includeZeroDegreeVertices = includeZeroDegreeVertices;
+
+ return this;
+ }
+
+ @Override
+ protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+ if (!super.canMergeConfigurationWith(other)) {
+ return false;
+ }
+
+ PageRank rhs = (PageRank) other;
+
+ return dampingFactor == rhs.dampingFactor &&
+ includeZeroDegreeVertices == rhs.includeZeroDegreeVertices;
+ }
+
@Override
protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
super.mergeConfiguration(other);
@@ -142,6 +181,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
// vertex degree
DataSet<Vertex<K, Degrees>> vertexDegree = input
.run(new VertexDegrees<K, VV, EV>()
+ .setIncludeZeroDegreeVertices(includeZeroDegreeVertices)
.setParallelism(parallelism));
// vertex count
@@ -158,7 +198,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
// vertices with zero in-edges
DataSet<Tuple2<K, DoubleValue>> sourceVertices = vertexDegree
.flatMap(new InitializeSourceVertices<>())
- .withBroadcastSet(vertexCount, VERTEX_COUNT)
.setParallelism(parallelism)
.name("Initialize source vertex scores");
@@ -285,7 +324,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
super.open(parameters);
Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT);
- output.f1 = new DoubleValue(1.0 / vertexCount.iterator().next().getValue());
+ Iterator<LongValue> vertexCountIterator = vertexCount.iterator();
+ output.f1 = new DoubleValue(vertexCountIterator.hasNext() ? 1.0 / vertexCountIterator.next().getValue() : Double.NaN);
}
@Override
@@ -376,11 +416,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
super.open(parameters);
Collection<Tuple2<T, DoubleValue>> sumOfScores = getRuntimeContext().getBroadcastVariable(SUM_OF_SCORES);
+ Iterator<Tuple2<T, DoubleValue>> sumOfScoresIterator = sumOfScores.iterator();
// floating point precision error is also included in sumOfSinks
- double sumOfSinks = 1 - sumOfScores.iterator().next().f1.getValue();
+ double sumOfSinks = 1 - (sumOfScoresIterator.hasNext() ? sumOfScoresIterator.next().f1.getValue() : 0);
Collection<LongValue> vertexCount = getRuntimeContext().getBroadcastVariable(VERTEX_COUNT);
- this.vertexCount = vertexCount.iterator().next().getValue();
+ Iterator<LongValue> vertexCountIterator = vertexCount.iterator();
+ this.vertexCount = vertexCountIterator.hasNext() ? vertexCountIterator.next().getValue() : 0;
this.uniformlyDistributedScore = ((1 - dampingFactor) + dampingFactor * sumOfSinks) / this.vertexCount;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
index 7294fd1..796667f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -310,6 +310,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
}
@Override
+ public String toString() {
+ return toPrintableString();
+ }
+
+ @Override
public String toPrintableString() {
NumberFormat nf = NumberFormat.getInstance();
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
index 97ee6fa..87c50ce 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
@@ -307,6 +307,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
}
@Override
+ public String toString() {
+ return toPrintableString();
+ }
+
+ @Override
public String toPrintableString() {
NumberFormat nf = NumberFormat.getInstance();
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
index 8c520e6..3392d47 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -283,6 +283,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
}
@Override
+ public String toString() {
+ return toPrintableString();
+ }
+
+ @Override
public String toPrintableString() {
NumberFormat nf = NumberFormat.getInstance();
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
index 1116149..39fec4a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -257,6 +257,11 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
}
@Override
+ public String toString() {
+ return toPrintableString();
+ }
+
+ @Override
public String toPrintableString() {
NumberFormat nf = NumberFormat.getInstance();
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index 752e206..701e698 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -410,7 +410,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
@Override
public void reduce(Iterable<Tuple3<T, T, FloatValue>> values, Collector<Result<T>> out)
throws Exception {
- float sum = 0;
+ double sum = 0;
Tuple3<T, T, FloatValue> edge = null;
for (Tuple3<T, T, FloatValue> next : values) {
@@ -421,7 +421,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
if (sum >= minimumScore) {
output.setVertexId0(edge.f0);
output.setVertexId1(edge.f1);
- output.setAdamicAdarScore(sum);
+ output.setAdamicAdarScore((float) sum);
out.collect(output);
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index 1afb5da..1b6acbc 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -24,6 +24,7 @@ import org.apache.flink.graph.Graph;
import org.apache.flink.graph.generator.CompleteGraph;
import org.apache.flink.graph.generator.EmptyGraph;
import org.apache.flink.graph.generator.RMatGraph;
+import org.apache.flink.graph.generator.StarGraph;
import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
import org.apache.flink.types.IntValue;
import org.apache.flink.types.LongValue;
@@ -56,11 +57,17 @@ public class AsmTestBase {
// empty graph
protected final long emptyGraphVertexCount = 3;
- protected Graph<LongValue, NullValue, NullValue> emptyGraph;
+ protected Graph<LongValue, NullValue, NullValue> emptyGraphWithVertices;
+
+ protected Graph<LongValue, NullValue, NullValue> emptyGraphWithoutVertices;
+
+ // star graph
+ protected final long starGraphVertexCount = 29;
+
+ protected Graph<LongValue, NullValue, NullValue> starGraph;
@Before
- public void setup()
- throws Exception {
+ public void setup() throws Exception {
env = ExecutionEnvironment.createCollectionsEnvironment();
env.getConfig().enableObjectReuse();
@@ -89,8 +96,16 @@ public class AsmTestBase {
completeGraph = new CompleteGraph(env, completeGraphVertexCount)
.generate();
- // empty graph
- emptyGraph = new EmptyGraph(env, emptyGraphVertexCount)
+ // empty graph with vertices but no edges
+ emptyGraphWithVertices = new EmptyGraph(env, emptyGraphVertexCount)
+ .generate();
+
+ // empty graph with no vertices or edges
+ emptyGraphWithoutVertices = new EmptyGraph(env, 0)
+ .generate();
+
+ // star graph
+ starGraph = new StarGraph(env, starGraphVertexCount)
.generate();
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
index 7d82b80..a31ce2e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/ChecksumHashCodeTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.graph.asm.dataset;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
@@ -27,6 +29,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -39,15 +42,13 @@ public class ChecksumHashCodeTest {
private ExecutionEnvironment env;
@Before
- public void setup()
- throws Exception {
+ public void setup() throws Exception {
env = ExecutionEnvironment.createCollectionsEnvironment();
env.getConfig().enableObjectReuse();
}
@Test
- public void testChecksumHashCode()
- throws Exception {
+ public void testList() throws Exception {
List<Long> list = Arrays.asList(ArrayUtils.toObject(
new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
@@ -58,4 +59,14 @@ public class ChecksumHashCodeTest {
assertEquals(list.size(), checksum.getCount());
assertEquals(list.size() * (list.size() - 1) / 2, checksum.getChecksum());
}
+
+ @Test
+ public void testEmptyList() throws Exception {
+ DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Long>(){}));
+
+ Checksum checksum = new ChecksumHashCode<Long>().run(dataset).execute();
+
+ assertEquals(0, checksum.getCount());
+ assertEquals(0, checksum.getChecksum());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
index 29b454b..cfeadce 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CollectTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.graph.asm.dataset;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -26,9 +28,11 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
/**
* Tests for {@link Collect}.
@@ -38,15 +42,13 @@ public class CollectTest {
private ExecutionEnvironment env;
@Before
- public void setup()
- throws Exception {
+ public void setup() throws Exception {
env = ExecutionEnvironment.createCollectionsEnvironment();
env.getConfig().enableObjectReuse();
}
@Test
- public void testCollect()
- throws Exception {
+ public void testList() throws Exception {
List<Long> list = Arrays.asList(ArrayUtils.toObject(
new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
@@ -56,4 +58,13 @@ public class CollectTest {
assertArrayEquals(list.toArray(), collected.toArray());
}
+
+ @Test
+ public void testEmptyList() throws Exception {
+ DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Long>(){}));
+
+ List<Long> collected = new Collect<Long>().run(dataset).execute();
+
+ assertEquals(0, collected.size());
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
index a1160ce..0167a5f 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/dataset/CountTest.java
@@ -18,6 +18,8 @@
package org.apache.flink.graph.asm.dataset;
+import org.apache.flink.api.common.typeinfo.TypeHint;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
@@ -26,6 +28,7 @@ import org.junit.Before;
import org.junit.Test;
import java.util.Arrays;
+import java.util.Collections;
import java.util.List;
import static org.junit.Assert.assertEquals;
@@ -38,15 +41,13 @@ public class CountTest {
private ExecutionEnvironment env;
@Before
- public void setup()
- throws Exception {
+ public void setup() throws Exception {
env = ExecutionEnvironment.createCollectionsEnvironment();
env.getConfig().enableObjectReuse();
}
@Test
- public void testCount()
- throws Exception {
+ public void testList() throws Exception {
List<Long> list = Arrays.asList(ArrayUtils.toObject(
new long[]{ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 }));
@@ -56,4 +57,13 @@ public class CountTest {
assertEquals(list.size(), count);
}
+
+ @Test
+ public void testEmptyList() throws Exception {
+ DataSet<Long> dataset = env.fromCollection(Collections.emptyList(), TypeInformation.of(new TypeHint<Long>(){}));
+
+ long count = new Count<Long>().run(dataset).execute();
+
+ assertEquals(0, count);
+ }
}
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
index 63bf133..08ba4aa 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
@@ -37,12 +37,10 @@ import static org.junit.Assert.assertEquals;
/**
* Tests for {@link EdgeDegreesPair}.
*/
-public class EdgeDegreesPairTest
-extends AsmTestBase {
+public class EdgeDegreesPairTest extends AsmTestBase {
@Test
- public void testWithSimpleGraph()
- throws Exception {
+ public void testWithSimpleGraph() throws Exception {
String expectedResult =
"(0,1,((null),(2,2,0),(3,0,3)))\n" +
"(0,2,((null),(2,2,0),(3,2,1)))\n" +
@@ -59,8 +57,23 @@ extends AsmTestBase {
}
@Test
- public void testWithRMatGraph()
- throws Exception {
+ public void testWithEmptyGraphWithVertices() throws Exception {
+ DataSet<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>> degreesPair = emptyGraphWithVertices
+ .run(new EdgeDegreesPair<>());
+
+ assertEquals(0, degreesPair.collect().size());
+ }
+
+ @Test
+ public void testWithEmptyGraphWithoutVertices() throws Exception {
+ DataSet<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>> degreesPair = emptyGraphWithoutVertices
+ .run(new EdgeDegreesPair<>());
+
+ assertEquals(0, degreesPair.collect().size());
+ }
+
+ @Test
+ public void testWithRMatGraph() throws Exception {
DataSet<Edge<LongValue, Tuple3<NullValue, Degrees, Degrees>>> degreesPair = directedRMatGraph(10, 16)
.run(new EdgeDegreesPair<>());
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
index 967cfb2..884e2d5 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
@@ -37,12 +37,10 @@ import static org.junit.Assert.assertEquals;
/**
* Tests for {@link EdgeSourceDegrees}.
*/
-public class EdgeSourceDegreesTest
-extends AsmTestBase {
+public class EdgeSourceDegreesTest extends AsmTestBase {
@Test
- public void testWithSimpleGraph()
- throws Exception {
+ public void testWithSimpleGraph() throws Exception {
String expectedResult =
"(0,1,((null),(2,2,0)))\n" +
"(0,2,((null),(2,2,0)))\n" +
@@ -59,8 +57,23 @@ extends AsmTestBase {
}
@Test
- public void testWithRMatGraph()
- throws Exception {
+ public void testWithEmptyGraphWithVertices() throws Exception {
+ DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> sourceDegrees = emptyGraphWithVertices
+ .run(new EdgeSourceDegrees<>());
+
+ assertEquals(0, sourceDegrees.collect().size());
+ }
+
+ @Test
+ public void testWithEmptyGraphWithoutVertices() throws Exception {
+ DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> sourceDegrees = emptyGraphWithoutVertices
+ .run(new EdgeSourceDegrees<>());
+
+ assertEquals(0, sourceDegrees.collect().size());
+ }
+
+ @Test
+ public void testWithRMatGraph() throws Exception {
DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> sourceDegrees = directedRMatGraph(10, 16)
.run(new EdgeSourceDegrees<>());
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
index abb76c4..5afd64c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
@@ -37,12 +37,10 @@ import static org.junit.Assert.assertEquals;
/**
* Tests for {@link EdgeTargetDegrees}.
*/
-public class EdgeTargetDegreesTest
-extends AsmTestBase {
+public class EdgeTargetDegreesTest extends AsmTestBase {
@Test
- public void testWithSimpleGraph()
- throws Exception {
+ public void testWithSimpleGraph() throws Exception {
String expectedResult =
"(0,1,((null),(3,0,3)))\n" +
"(0,2,((null),(3,2,1)))\n" +
@@ -59,8 +57,23 @@ extends AsmTestBase {
}
@Test
- public void testWithRMatGraph()
- throws Exception {
+ public void testWithEmptyGraphWithVertices() throws Exception {
+ DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> targetDegrees = emptyGraphWithVertices
+ .run(new EdgeTargetDegrees<>());
+
+ assertEquals(0, targetDegrees.collect().size());
+ }
+
+ @Test
+ public void testWithEmptyGraphWithoutVertices() throws Exception {
+ DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> targetDegrees = emptyGraphWithoutVertices
+ .run(new EdgeTargetDegrees<>());
+
+ assertEquals(0, targetDegrees.collect().size());
+ }
+
+ @Test
+ public void testWithRMatGraph() throws Exception {
DataSet<Edge<LongValue, Tuple2<NullValue, Degrees>>> targetDegrees = directedRMatGraph(10, 16)
.run(new EdgeTargetDegrees<>());
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
index 91f354f..7b89fe7 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
@@ -36,12 +36,10 @@ import static org.junit.Assert.assertEquals;
/**
* Tests for {@link VertexDegrees}.
*/
-public class VertexDegreesTest
-extends AsmTestBase {
+public class VertexDegreesTest extends AsmTestBase {
@Test
- public void testWithSimpleDirectedGraph()
- throws Exception {
+ public void testWithDirectedSimpleGraph() throws Exception {
DataSet<Vertex<IntValue, Degrees>> degrees = directedSimpleGraph
.run(new VertexDegrees<>());
@@ -57,8 +55,7 @@ extends AsmTestBase {
}
@Test
- public void testWithSimpleUndirectedGraph()
- throws Exception {
+ public void testWithUndirectedSimpleGraph() throws Exception {
DataSet<Vertex<IntValue, Degrees>> degrees = undirectedSimpleGraph
.run(new VertexDegrees<>());
@@ -74,17 +71,14 @@ extends AsmTestBase {
}
@Test
- public void testWithEmptyGraph()
- throws Exception {
- DataSet<Vertex<LongValue, Degrees>> degrees;
-
- degrees = emptyGraph
+ public void testWithEmptyGraphWithVertices() throws Exception {
+ DataSet<Vertex<LongValue, Degrees>> degreesWithoutZeroDegreeVertices = emptyGraphWithVertices
.run(new VertexDegrees<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false));
- assertEquals(0, degrees.collect().size());
+ assertEquals(0, degreesWithoutZeroDegreeVertices.collect().size());
- degrees = emptyGraph
+ DataSet<Vertex<LongValue, Degrees>> degreesWithZeroDegreeVertices = emptyGraphWithVertices
.run(new VertexDegrees<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(true));
@@ -93,12 +87,26 @@ extends AsmTestBase {
"(1,(0,0,0))\n" +
"(2,(0,0,0))";
- TestBaseUtils.compareResultAsText(degrees.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(degreesWithZeroDegreeVertices.collect(), expectedResult);
+ }
+
+ @Test
+ public void testWithEmptyGraphWithoutVertices() throws Exception {
+ DataSet<Vertex<LongValue, Degrees>> degreesWithoutZeroDegreeVertices = emptyGraphWithoutVertices
+ .run(new VertexDegrees<LongValue, NullValue, NullValue>()
+ .setIncludeZeroDegreeVertices(false));
+
+ assertEquals(0, degreesWithoutZeroDegreeVertices.collect().size());
+
+ DataSet<Vertex<LongValue, Degrees>> degreesWithZeroDegreeVertices = emptyGraphWithoutVertices
+ .run(new VertexDegrees<LongValue, NullValue, NullValue>()
+ .setIncludeZeroDegreeVertices(true));
+
+ assertEquals(0, degreesWithZeroDegreeVertices.collect().size());
}
@Test
- public void testWithRMatGraph()
- throws Exception {
+ public void testWithRMatGraph() throws Exception {
DataSet<Vertex<LongValue, Degrees>> degrees = directedRMatGraph(10, 16)
.run(new VertexDegrees<>());
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
index f671cab..7f6c9e3 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
@@ -35,12 +35,10 @@ import static org.junit.Assert.assertEquals;
/**
* Tests for {@link VertexInDegree}.
*/
-public class VertexInDegreeTest
-extends AsmTestBase {
+public class VertexInDegreeTest extends AsmTestBase {
@Test
- public void testWithSimpleGraph()
- throws Exception {
+ public void testWithDirectedSimpleGraph() throws Exception {
DataSet<Vertex<IntValue, LongValue>> inDegree = directedSimpleGraph
.run(new VertexInDegree<IntValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(true));
@@ -57,17 +55,31 @@ extends AsmTestBase {
}
@Test
- public void testWithEmptyGraph()
- throws Exception {
- DataSet<Vertex<LongValue, LongValue>> inDegree;
+ public void testWithUndirectedSimpleGraph() throws Exception {
+ DataSet<Vertex<IntValue, LongValue>> inDegree = undirectedSimpleGraph
+ .run(new VertexInDegree<IntValue, NullValue, NullValue>()
+ .setIncludeZeroDegreeVertices(true));
+
+ String expectedResult =
+ "(0,2)\n" +
+ "(1,3)\n" +
+ "(2,3)\n" +
+ "(3,4)\n" +
+ "(4,1)\n" +
+ "(5,1)";
- inDegree = emptyGraph
+ TestBaseUtils.compareResultAsText(inDegree.collect(), expectedResult);
+ }
+
+ @Test
+ public void testWithEmptyGraphWithVertices() throws Exception {
+ DataSet<Vertex<LongValue, LongValue>> inDegreeWithoutZeroDegreeVertices = emptyGraphWithVertices
.run(new VertexInDegree<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false));
- assertEquals(0, inDegree.collect().size());
+ assertEquals(0, inDegreeWithoutZeroDegreeVertices.collect().size());
- inDegree = emptyGraph
+ DataSet<Vertex<LongValue, LongValue>> inDegreeWithZeroDegreeVertices = emptyGraphWithVertices
.run(new VertexInDegree<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(true));
@@ -76,12 +88,26 @@ extends AsmTestBase {
"(1,0)\n" +
"(2,0)";
- TestBaseUtils.compareResultAsText(inDegree.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(inDegreeWithZeroDegreeVertices.collect(), expectedResult);
+ }
+
+ @Test
+ public void testWithEmptyGraphWithoutVertices() throws Exception {
+ DataSet<Vertex<LongValue, LongValue>> inDegreeWithoutZeroDegreeVertices = emptyGraphWithoutVertices
+ .run(new VertexInDegree<LongValue, NullValue, NullValue>()
+ .setIncludeZeroDegreeVertices(false));
+
+ assertEquals(0, inDegreeWithoutZeroDegreeVertices.collect().size());
+
+ DataSet<Vertex<LongValue, LongValue>> inDegreeWithZeroDegreeVertices = emptyGraphWithoutVertices
+ .run(new VertexInDegree<LongValue, NullValue, NullValue>()
+ .setIncludeZeroDegreeVertices(true));
+
+ assertEquals(0, inDegreeWithZeroDegreeVertices.collect().size());
}
@Test
- public void testWithRMatGraph()
- throws Exception {
+ public void testWithRMatGraph() throws Exception {
DataSet<Vertex<LongValue, LongValue>> inDegree = directedRMatGraph(10, 16)
.run(new VertexInDegree<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(true));
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
index 1517f23..7031d8f 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
@@ -35,12 +35,10 @@ import static org.junit.Assert.assertEquals;
/**
* Tests for {@link VertexOutDegree}.
*/
-public class VertexOutDegreeTest
-extends AsmTestBase {
+public class VertexOutDegreeTest extends AsmTestBase {
@Test
- public void testWithSimpleGraph()
- throws Exception {
+ public void testWithDirectedSimpleGraph() throws Exception {
DataSet<Vertex<IntValue, LongValue>> outDegree = directedSimpleGraph
.run(new VertexOutDegree<IntValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(true));
@@ -57,17 +55,31 @@ extends AsmTestBase {
}
@Test
- public void testWithEmptyGraph()
- throws Exception {
- DataSet<Vertex<LongValue, LongValue>> outDegree;
+ public void testWithUndirectedSimpleGraph() throws Exception {
+ DataSet<Vertex<IntValue, LongValue>> outDegree = undirectedSimpleGraph
+ .run(new VertexOutDegree<IntValue, NullValue, NullValue>()
+ .setIncludeZeroDegreeVertices(true));
+
+ String expectedResult =
+ "(0,2)\n" +
+ "(1,3)\n" +
+ "(2,3)\n" +
+ "(3,4)\n" +
+ "(4,1)\n" +
+ "(5,1)";
- outDegree = emptyGraph
+ TestBaseUtils.compareResultAsText(outDegree.collect(), expectedResult);
+ }
+
+ @Test
+ public void testWithEmptyGraphWithVertices() throws Exception {
+ DataSet<Vertex<LongValue, LongValue>> outDegreeWithoutZeroDegreeVertices = emptyGraphWithVertices
.run(new VertexOutDegree<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false));
- assertEquals(0, outDegree.collect().size());
+ assertEquals(0, outDegreeWithoutZeroDegreeVertices.collect().size());
- outDegree = emptyGraph
+ DataSet<Vertex<LongValue, LongValue>> outDegreeWithZeroDegreeVertices = emptyGraphWithVertices
.run(new VertexOutDegree<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(true));
@@ -76,12 +88,26 @@ extends AsmTestBase {
"(1,0)\n" +
"(2,0)";
- TestBaseUtils.compareResultAsText(outDegree.collect(), expectedResult);
+ TestBaseUtils.compareResultAsText(outDegreeWithZeroDegreeVertices.collect(), expectedResult);
+ }
+
+ @Test
+ public void testWithEmptyGraphWithoutVertices() throws Exception {
+ DataSet<Vertex<LongValue, LongValue>> outDegreeWithoutZeroDegreeVertices = emptyGraphWithoutVertices
+ .run(new VertexOutDegree<LongValue, NullValue, NullValue>()
+ .setIncludeZeroDegreeVertices(false));
+
+ assertEquals(0, outDegreeWithoutZeroDegreeVertices.collect().size());
+
+ DataSet<Vertex<LongValue, LongValue>> outDegreeWithZeroDegreeVertices = emptyGraphWithoutVertices
+ .run(new VertexOutDegree<LongValue, NullValue, NullValue>()
+ .setIncludeZeroDegreeVertices(true));
+
+ assertEquals(0, outDegreeWithZeroDegreeVertices.collect().size());
}
@Test
- public void testWithRMatGraph()
- throws Exception {
+ public void testWithRMatGraph() throws Exception {
DataSet<Vertex<LongValue, LongValue>> outDegree = directedRMatGraph(10, 16)
.run(new VertexOutDegree<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(true));
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
index 1cae2e7..95a89c5 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePairTest.java
@@ -36,12 +36,10 @@ import static org.junit.Assert.assertEquals;
/**
* Tests for {@link EdgeDegreePair}.
*/
-public class EdgeDegreePairTest
-extends AsmTestBase {
+public class EdgeDegreePairTest extends AsmTestBase {
@Test
- public void testWithSimpleGraph()
- throws Exception {
+ public void testWithSimpleGraph() throws Exception {
String expectedResult =
"(0,1,((null),2,3))\n" +
"(0,2,((null),2,3))\n" +
@@ -59,7 +57,8 @@ extends AsmTestBase {
"(5,3,((null),1,4))";
DataSet<Edge<IntValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = undirectedSimpleGraph
- .run(new EdgeDegreePair<>());
+ .run(new EdgeDegreePair<IntValue, NullValue, NullValue>()
+ .setReduceOnTargetId(false));
TestBaseUtils.compareResultAsText(degreePairOnSourceId.collect(), expectedResult);
@@ -71,10 +70,40 @@ extends AsmTestBase {
}
@Test
- public void testWithRMatGraph()
- throws Exception {
+ public void testWithEmptyGraphWithVertices() throws Exception {
+ DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = emptyGraphWithVertices
+ .run(new EdgeDegreePair<LongValue, NullValue, NullValue>()
+ .setReduceOnTargetId(false));
+
+ assertEquals(0, degreePairOnSourceId.collect().size());
+
+ DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnTargetId = emptyGraphWithVertices
+ .run(new EdgeDegreePair<LongValue, NullValue, NullValue>()
+ .setReduceOnTargetId(true));
+
+ assertEquals(0, degreePairOnTargetId.collect().size());
+ }
+
+ @Test
+ public void testWithEmptyGraphWithoutVertices() throws Exception {
+ DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = emptyGraphWithoutVertices
+ .run(new EdgeDegreePair<LongValue, NullValue, NullValue>()
+ .setReduceOnTargetId(false));
+
+ assertEquals(0, degreePairOnSourceId.collect().size());
+
+ DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnTargetId = emptyGraphWithoutVertices
+ .run(new EdgeDegreePair<LongValue, NullValue, NullValue>()
+ .setReduceOnTargetId(true));
+
+ assertEquals(0, degreePairOnTargetId.collect().size());
+ }
+
+ @Test
+ public void testWithRMatGraph() throws Exception {
DataSet<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>> degreePairOnSourceId = undirectedRMatGraph(10, 16)
- .run(new EdgeDegreePair<>());
+ .run(new EdgeDegreePair<LongValue, NullValue, NullValue>()
+ .setReduceOnTargetId(false));
Checksum checksumOnSourceId = new ChecksumHashCode<Edge<LongValue, Tuple3<NullValue, LongValue, LongValue>>>()
.run(degreePairOnSourceId)
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
index 2d8b2e3..3802b4f 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegreeTest.java
@@ -36,12 +36,10 @@ import static org.junit.Assert.assertEquals;
/**
* Tests for {@link EdgeSourceDegree}.
*/
-public class EdgeSourceDegreeTest
-extends AsmTestBase {
+public class EdgeSourceDegreeTest extends AsmTestBase {
@Test
- public void testWithSimpleGraph()
- throws Exception {
+ public void testWithSimpleGraph() throws Exception {
String expectedResult =
"(0,1,((null),2))\n" +
"(0,2,((null),2))\n" +
@@ -59,7 +57,8 @@ extends AsmTestBase {
"(5,3,((null),1))";
DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = undirectedSimpleGraph
- .run(new EdgeSourceDegree<>());
+ .run(new EdgeSourceDegree<IntValue, NullValue, NullValue>()
+ .setReduceOnTargetId(false));
TestBaseUtils.compareResultAsText(sourceDegreeOnSourceId.collect(), expectedResult);
@@ -71,10 +70,40 @@ extends AsmTestBase {
}
@Test
- public void testWithRMatGraph()
- throws Exception {
+ public void testWithEmptyGraphWithVertices() throws Exception {
+ DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = emptyGraphWithVertices
+ .run(new EdgeSourceDegree<LongValue, NullValue, NullValue>()
+ .setReduceOnTargetId(false));
+
+ assertEquals(0, sourceDegreeOnSourceId.collect().size());
+
+ DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnTargetId = emptyGraphWithVertices
+ .run(new EdgeSourceDegree<LongValue, NullValue, NullValue>()
+ .setReduceOnTargetId(true));
+
+ assertEquals(0, sourceDegreeOnTargetId.collect().size());
+ }
+
+ @Test
+ public void testWithEmptyGraphWithoutVertices() throws Exception {
+ DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = emptyGraphWithoutVertices
+ .run(new EdgeSourceDegree<LongValue, NullValue, NullValue>()
+ .setReduceOnTargetId(false));
+
+ assertEquals(0, sourceDegreeOnSourceId.collect().size());
+
+ DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnTargetId = emptyGraphWithoutVertices
+ .run(new EdgeSourceDegree<LongValue, NullValue, NullValue>()
+ .setReduceOnTargetId(true));
+
+ assertEquals(0, sourceDegreeOnTargetId.collect().size());
+ }
+
+ @Test
+ public void testWithRMatGraph() throws Exception {
DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> sourceDegreeOnSourceId = undirectedRMatGraph(10, 16)
- .run(new EdgeSourceDegree<>());
+ .run(new EdgeSourceDegree<LongValue, NullValue, NullValue>()
+ .setReduceOnTargetId(false));
Checksum checksumOnSourceId = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>()
.run(sourceDegreeOnSourceId)
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
index a7c88a1..51f880b 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegreeTest.java
@@ -36,12 +36,10 @@ import static org.junit.Assert.assertEquals;
/**
* Tests for {@link EdgeTargetDegree}.
*/
-public class EdgeTargetDegreeTest
-extends AsmTestBase {
+public class EdgeTargetDegreeTest extends AsmTestBase {
@Test
- public void testWithSimpleGraph()
- throws Exception {
+ public void testWithSimpleGraph() throws Exception {
String expectedResult =
"(0,1,((null),3))\n" +
"(0,2,((null),3))\n" +
@@ -59,7 +57,8 @@ extends AsmTestBase {
"(5,3,((null),4))";
DataSet<Edge<IntValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = undirectedSimpleGraph
- .run(new EdgeTargetDegree<>());
+ .run(new EdgeTargetDegree<IntValue, NullValue, NullValue>()
+ .setReduceOnSourceId(false));
TestBaseUtils.compareResultAsText(targetDegreeOnTargetId.collect(), expectedResult);
@@ -71,10 +70,40 @@ extends AsmTestBase {
}
@Test
- public void testWithRMatGraph()
- throws Exception {
+ public void testWithEmptyGraphWithVertices() throws Exception {
+ DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = emptyGraphWithVertices
+ .run(new EdgeTargetDegree<LongValue, NullValue, NullValue>()
+ .setReduceOnSourceId(false));
+
+ assertEquals(0, targetDegreeOnTargetId.collect().size());
+
+ DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnSourceId = emptyGraphWithVertices
+ .run(new EdgeTargetDegree<LongValue, NullValue, NullValue>()
+ .setReduceOnSourceId(true));
+
+ assertEquals(0, targetDegreeOnSourceId.collect().size());
+ }
+
+ @Test
+ public void testWithEmptyGraphWithoutVertices() throws Exception {
+ DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = emptyGraphWithoutVertices
+ .run(new EdgeTargetDegree<LongValue, NullValue, NullValue>()
+ .setReduceOnSourceId(false));
+
+ assertEquals(0, targetDegreeOnTargetId.collect().size());
+
+ DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnSourceId = emptyGraphWithoutVertices
+ .run(new EdgeTargetDegree<LongValue, NullValue, NullValue>()
+ .setReduceOnSourceId(true));
+
+ assertEquals(0, targetDegreeOnSourceId.collect().size());
+ }
+
+ @Test
+ public void testWithRMatGraph() throws Exception {
DataSet<Edge<LongValue, Tuple2<NullValue, LongValue>>> targetDegreeOnTargetId = undirectedRMatGraph(10, 16)
- .run(new EdgeSourceDegree<>());
+ .run(new EdgeTargetDegree<LongValue, NullValue, NullValue>()
+ .setReduceOnSourceId(false));
Checksum checksumOnTargetId = new ChecksumHashCode<Edge<LongValue, Tuple2<NullValue, LongValue>>>()
.run(targetDegreeOnTargetId)
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
index bc76bff..49f0007 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegreeTest.java
@@ -35,12 +35,10 @@ import static org.junit.Assert.assertEquals;
/**
* Tests for {@link VertexDegree}.
*/
-public class VertexDegreeTest
-extends AsmTestBase {
+public class VertexDegreeTest extends AsmTestBase {
@Test
- public void testWithSimpleGraph()
- throws Exception {
+ public void testWithSimpleGraph() throws Exception {
String expectedResult =
"(0,2)\n" +
"(1,3)\n" +
@@ -50,7 +48,8 @@ extends AsmTestBase {
"(5,1)";
DataSet<Vertex<IntValue, LongValue>> degreeOnSourceId = undirectedSimpleGraph
- .run(new VertexDegree<>());
+ .run(new VertexDegree<IntValue, NullValue, NullValue>()
+ .setReduceOnTargetId(false));
TestBaseUtils.compareResultAsText(degreeOnSourceId.collect(), expectedResult);
@@ -62,12 +61,12 @@ extends AsmTestBase {
}
@Test
- public void testWithCompleteGraph()
- throws Exception {
+ public void testWithCompleteGraph() throws Exception {
long expectedDegree = completeGraphVertexCount - 1;
DataSet<Vertex<LongValue, LongValue>> degreeOnSourceId = completeGraph
- .run(new VertexDegree<>());
+ .run(new VertexDegree<LongValue, NullValue, NullValue>()
+ .setReduceOnTargetId(false));
for (Vertex<LongValue, LongValue> vertex : degreeOnSourceId.collect()) {
assertEquals(expectedDegree, vertex.getValue().getValue());
@@ -83,17 +82,16 @@ extends AsmTestBase {
}
@Test
- public void testWithEmptyGraph()
- throws Exception {
+ public void testWithEmptyGraphWithVertices() throws Exception {
DataSet<Vertex<LongValue, LongValue>> degree;
- degree = emptyGraph
+ degree = emptyGraphWithVertices
.run(new VertexDegree<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(false));
assertEquals(0, degree.collect().size());
- degree = emptyGraph
+ degree = emptyGraphWithVertices
.run(new VertexDegree<LongValue, NullValue, NullValue>()
.setIncludeZeroDegreeVertices(true));
@@ -106,10 +104,27 @@ extends AsmTestBase {
}
@Test
- public void testWithRMatGraph()
- throws Exception {
+ public void testWithEmptyGraphWithoutVertices() throws Exception {
+ DataSet<Vertex<LongValue, LongValue>> degree;
+
+ degree = emptyGraphWithoutVertices
+ .run(new VertexDegree<LongValue, NullValue, NullValue>()
+ .setIncludeZeroDegreeVertices(false));
+
+ assertEquals(0, degree.collect().size());
+
+ degree = emptyGraphWithoutVertices
+ .run(new VertexDegree<LongValue, NullValue, NullValue>()
+ .setIncludeZeroDegreeVertices(true));
+
+ assertEquals(0, degree.collect().size());
+ }
+
+ @Test
+ public void testWithRMatGraph() throws Exception {
DataSet<Vertex<LongValue, LongValue>> degreeOnSourceId = undirectedRMatGraph(10, 16)
- .run(new VertexDegree<>());
+ .run(new VertexDegree<LongValue, NullValue, NullValue>()
+ .setReduceOnTargetId(false));
Checksum checksumOnSourceId = new ChecksumHashCode<Vertex<LongValue, LongValue>>()
.run(degreeOnSourceId)
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
index 51e7712..f43be9c 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.graph.asm.dataset.ChecksumHashCode.Checksum;
import org.apache.flink.graph.library.metric.ChecksumHashCode;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
import org.apache.flink.types.NullValue;
import org.junit.Test;
@@ -33,12 +34,10 @@ import static org.junit.Assert.assertEquals;
/**
* Tests for {@link MaximumDegree}.
*/
-public class MaximumDegreeTest
-extends AsmTestBase {
+public class MaximumDegreeTest extends AsmTestBase {
@Test
- public void testWithSimpleGraph()
- throws Exception {
+ public void testWithSimpleGraph() throws Exception {
Graph<IntValue, NullValue, NullValue> graph = undirectedSimpleGraph
.run(new MaximumDegree<>(3));
@@ -63,8 +62,25 @@ extends AsmTestBase {
}
@Test
- public void testWithRMatGraph()
- throws Exception {
+ public void testWithEmptyGraphWithVertices() throws Exception {
+ Graph<LongValue, NullValue, NullValue> graph = emptyGraphWithVertices
+ .run(new MaximumDegree<>(1));
+
+ assertEquals(emptyGraphVertexCount, graph.getVertices().collect().size());
+ assertEquals(0, graph.getEdges().collect().size());
+ }
+
+ @Test
+ public void testWithEmptyGraphWithoutVertices() throws Exception {
+ Graph<LongValue, NullValue, NullValue> graph = emptyGraphWithoutVertices
+ .run(new MaximumDegree<>(1));
+
+ assertEquals(0, graph.getVertices().collect().size());
+ assertEquals(0, graph.getEdges().collect().size());
+ }
+
+ @Test
+ public void testWithRMatGraph() throws Exception {
Checksum checksum = undirectedRMatGraph(10, 16)
.run(new MaximumDegree<>(16))
.run(new ChecksumHashCode<>())
http://git-wip-us.apache.org/repos/asf/flink/blob/9437a0ff/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
index 751d030..f31c09a 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
@@ -62,8 +62,7 @@ public class SimplifyTest {
}
@Test
- public void test()
- throws Exception {
+ public void test() throws Exception {
String expectedResult =
"(0,1,(null))\n" +
"(0,2,(null))\n" +