You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/06/07 14:58:13 UTC
[3/3] flink git commit: [FLINK-4013] [gelly] GraphAlgorithms to
simplify directed and undirected graphs
[FLINK-4013] [gelly] GraphAlgorithms to simplify directed and undirected graphs
This closes #2067
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7160a681
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7160a681
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7160a681
Branch: refs/heads/master
Commit: 7160a681240deab693aa4d69c24c4a8a63bb58ba
Parents: a611271
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Jun 2 16:01:00 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jun 7 09:03:26 2016 -0400
----------------------------------------------------------------------
docs/apis/batch/libs/gelly.md | 43 ++++++-
.../apache/flink/graph/examples/Graph500.java | 14 ++-
.../flink/graph/examples/JaccardIndex.java | 5 +-
.../examples/LocalClusteringCoefficient.java | 5 +-
.../flink/graph/examples/TriangleListing.java | 5 +-
.../graph/asm/simple/directed/Simplify.java | 91 ++++++++++++++
.../graph/asm/simple/undirected/Simplify.java | 126 +++++++++++++++++++
.../apache/flink/graph/generator/RMatGraph.java | 52 +-------
.../org/apache/flink/graph/asm/AsmTestBase.java | 13 +-
.../annotate/directed/EdgeDegreesPairTest.java | 6 +-
.../directed/EdgeSourceDegreesTest.java | 6 +-
.../directed/EdgeTargetDegreesTest.java | 6 +-
.../annotate/directed/VertexDegreesTest.java | 6 +-
.../annotate/directed/VertexInDegreeTest.java | 2 +-
.../annotate/directed/VertexOutDegreeTest.java | 2 +-
.../graph/asm/simple/directed/SimplifyTest.java | 75 +++++++++++
.../asm/simple/undirected/SimplifyTest.java | 89 +++++++++++++
.../library/similarity/JaccardIndexTest.java | 5 +-
18 files changed, 471 insertions(+), 80 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index 1f7f271..7adff04 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -2315,6 +2315,34 @@ Graph<K, VV, EV> filteredGraph = graph
</tr>
<tr>
+ <td>simple.directed.<br/><strong>Simplify</strong></td>
+ <td>
+ <p>Remove self-loops and duplicate edges from a <a href="#graph-representation">directed graph</a>.</p>
+{% highlight java %}
+graph.run(new Simplify());
+{% endhighlight %}
+ <p>Optional configuration:</p>
+ <ul>
+ <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+ </ul>
+ </td>
+ </tr>
+
+ <tr>
+ <td>simple.undirected.<br/><strong>Simplify</strong></td>
+ <td>
+ <p>Add symmetric edges and remove self-loops and duplicate edges from an <a href="#graph-representation">undirected graph</a>.</p>
+{% highlight java %}
+graph.run(new Simplify());
+{% endhighlight %}
+ <p>Optional configuration:</p>
+ <ul>
+ <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+ </ul>
+ </td>
+ </tr>
+
+ <tr>
<td>translate.<br/><strong>TranslateGraphIds</strong></td>
<td>
<p>Translate vertex and edge IDs using the given <code>TranslateFunction</code>.</p>
@@ -2325,6 +2353,10 @@ graph.run(new TranslateGraphIds(new LongValueToStringValue()));
<ul>
<li><p><strong>translator</strong>: implements type or value conversion</p></li>
</ul>
+ <p>Optional configuration:</p>
+ <ul>
+ <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+ </ul>
</td>
</tr>
@@ -2339,6 +2371,10 @@ graph.run(new TranslateVertexValues(new LongValueAddOffset(vertexCount)));
<ul>
<li><p><strong>translator</strong>: implements type or value conversion</p></li>
</ul>
+ <p>Optional configuration:</p>
+ <ul>
+ <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+ </ul>
</td>
</tr>
@@ -2353,6 +2389,10 @@ graph.run(new TranslateEdgeValues(new Nullify()));
<ul>
<li><p><strong>translator</strong>: implements type or value conversion</p></li>
</ul>
+ <p>Optional configuration:</p>
+ <ul>
+ <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+ </ul>
</td>
</tr>
</tbody>
@@ -2855,7 +2895,6 @@ boolean clipAndFlip = false;
Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
.setConstants(0.57f, 0.19f, 0.19f)
.setNoise(true, 0.10f)
- .setSimpleGraph(true, clipAndFlip)
.generate();
{% endhighlight %}
</div>
@@ -2872,7 +2911,7 @@ val edgeCount = edgeFactor * vertexCount
clipAndFlip = false
-val graph = new RMatGraph(env.getJavaEnv, rnd, vertexCount, edgeCount).setConstants(0.57f, 0.19f, 0.19f).setNoise(true, 0.10f).setSimpleGraph(true, clipAndFlip).generate()
+val graph = new RMatGraph(env.getJavaEnv, rnd, vertexCount, edgeCount).setConstants(0.57f, 0.19f, 0.19f).setNoise(true, 0.10f).generate()
{% endhighlight %}
</div>
</div>
http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
index 0daadc1..73bba2c 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
@@ -26,10 +26,13 @@ import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
import org.apache.flink.graph.generator.RMatGraph;
import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
import org.apache.flink.graph.generator.random.RandomGenerableFactory;
import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
import java.text.NumberFormat;
@@ -69,9 +72,14 @@ public class Graph500 {
boolean simplify = parameters.getBoolean("simplify", DEFAULT_SIMPLIFY);
boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
- DataSet<Tuple2<LongValue,LongValue>> edges = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
- .setSimpleGraph(simplify, clipAndFlip)
- .generate()
+ Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+ .generate();
+
+ if (simplify) {
+ graph = graph.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+ }
+
+ DataSet<Tuple2<LongValue,LongValue>> edges = graph
.getEdges()
.project(0, 1);
http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
index f8707d6..c078d73 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
import org.apache.flink.graph.asm.translate.LongValueToIntValue;
import org.apache.flink.graph.asm.translate.TranslateGraphIds;
import org.apache.flink.graph.generator.RMatGraph;
@@ -118,8 +119,8 @@ public class JaccardIndex {
boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
- .setSimpleGraph(true, clipAndFlip)
- .generate();
+ .generate()
+ .run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
if (scale > 32) {
ji = graph
http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
index 58e7cb6..bed68b2 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
import org.apache.flink.graph.asm.translate.LongValueToIntValue;
import org.apache.flink.graph.asm.translate.TranslateGraphIds;
import org.apache.flink.graph.generator.RMatGraph;
@@ -74,8 +75,8 @@ public class LocalClusteringCoefficient {
boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
- .setSimpleGraph(true, clipAndFlip)
- .generate();
+ .generate()
+ .run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
DataSet cc;
http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
index 2a9bb76..a20bf20 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.io.CsvOutputFormat;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
import org.apache.flink.graph.asm.translate.LongValueToIntValue;
import org.apache.flink.graph.asm.translate.TranslateGraphIds;
import org.apache.flink.graph.generator.RMatGraph;
@@ -72,8 +73,8 @@ public class TriangleListing {
boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
- .setSimpleGraph(true, clipAndFlip)
- .generate();
+ .generate()
+ .run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
DataSet tl;
http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
new file mode 100644
index 0000000..7362a3e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -0,0 +1,91 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.flink.graph.asm.simple.directed;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Remove self-loops and duplicate edges from a directed graph.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
+
+ // Optional configuration
+ private int parallelism = PARALLELISM_DEFAULT;
+
+ /**
+ * Override the operator parallelism.
+ *
+ * @param parallelism operator parallelism
+ * @return this
+ */
+ public Simplify<K, VV, EV> setParallelism(int parallelism) {
+ Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+ "The parallelism must be greater than zero.");
+
+ this.parallelism = parallelism;
+
+ return this;
+ }
+
+ @Override
+ public Graph<K, VV, EV> run(Graph<K, VV, EV> input)
+ throws Exception {
+ // Edges
+ DataSet<Edge<K, EV>> edges = input
+ .getEdges()
+ .filter(new RemoveSelfLoops<K, EV>())
+ .setParallelism(parallelism)
+ .name("Remove self-loops")
+ .distinct(0, 1)
+ .setParallelism(parallelism)
+ .name("Remove duplicate edges");
+
+ // Graph
+ return Graph.fromDataSet(input.getVertices(), edges, input.getContext());
+ }
+
+ /**
+ * Filter out edges where the source and target vertex IDs are equal.
+ *
+ * @param <T> ID type
+ * @param <ET> edge value type
+ */
+ private static class RemoveSelfLoops<T extends Comparable<T>, ET>
+ implements FilterFunction<Edge<T, ET>> {
+ @Override
+ public boolean filter(Edge<T, ET> value) throws Exception {
+ return (value.f0.compareTo(value.f1) != 0);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
new file mode 100644
index 0000000..13ac470
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -0,0 +1,126 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.flink.graph.asm.simple.undirected;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Add symmetric edges and remove self-loops and duplicate edges from an
+ * undirected graph.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
+
+ // Required configuration
+ private boolean clipAndFlip;
+
+ // Optional configuration
+ private int parallelism = PARALLELISM_DEFAULT;
+
+ /**
+ * Simplifies an undirected graph by adding reverse edges and removing
+ * self-loops and duplicate edges.
+ *
+ * When clip-and-flip is set, edges where source < target are removed
+ * before symmetrizing the graph.
+ *
+ * @param clipAndFlip method for generating simple graph
+ */
+ public Simplify(boolean clipAndFlip) {
+ this.clipAndFlip = clipAndFlip;
+ }
+
+ /**
+ * Override the operator parallelism.
+ *
+ * @param parallelism operator parallelism
+ * @return this
+ */
+ public Simplify<K, VV, EV> setParallelism(int parallelism) {
+ Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+ "The parallelism must be greater than zero.");
+
+ this.parallelism = parallelism;
+
+ return this;
+ }
+
+ @Override
+ public Graph<K, VV, EV> run(Graph<K, VV, EV> input)
+ throws Exception {
+ // Edges
+ DataSet<Edge<K, EV>> edges = input
+ .getEdges()
+ .flatMap(new SymmetrizeAndRemoveSelfLoops<K, EV>(clipAndFlip))
+ .setParallelism(parallelism)
+ .name("Remove self-loops")
+ .distinct(0, 1)
+ .setParallelism(parallelism)
+ .name("Remove duplicate edges");
+
+ // Graph
+ return Graph.fromDataSet(input.getVertices(), edges, input.getContext());
+ }
+
+ /**
+ * Filter out edges where the source and target vertex IDs are equal and
+ * for each edge also emit an edge with the vertex IDs flipped.
+ *
+ * @param <T> ID type
+ * @param <ET> edge value type
+ */
+ private static class SymmetrizeAndRemoveSelfLoops<T extends Comparable<T>, ET>
+ implements FlatMapFunction<Edge<T, ET>, Edge<T, ET>> {
+ private boolean clipAndFlip;
+
+ public SymmetrizeAndRemoveSelfLoops(boolean clipAndFlip) {
+ this.clipAndFlip = clipAndFlip;
+ }
+
+ @Override
+ public void flatMap(Edge<T, ET> value, Collector<Edge<T, ET>> out) throws Exception {
+ int comparison = value.f0.compareTo(value.f1);
+
+ if ((clipAndFlip && comparison > 0) || (!clipAndFlip && comparison != 0)) {
+ out.collect(value);
+
+ T temp = value.f0;
+ value.f0 = value.f1;
+ value.f1 = temp;
+
+ out.collect(value);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
index 246d8bb..8a17b13 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
@@ -69,10 +69,6 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
private float noise = DEFAULT_NOISE;
- private boolean simpleGraph = false;
-
- private boolean clipAndFlip = false;
-
/**
* Generate a directed or undirected power-law {@link Graph} using the
* Recursive Matrix (R-Mat) model.
@@ -142,22 +138,6 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
return this;
}
- /**
- * When configured for a simple graph duplicate edges and self-loops will
- * be removed. The clip-and-flip method removes edges where source < target
- * before symmetrizing the graph.
- *
- * @param simpleGraph whether to generate a simple graph
- * @param clipAndFlip method for generating simple graph
- * @return this
- */
- public RMatGraph<T> setSimpleGraph(boolean simpleGraph, boolean clipAndFlip) {
- this.simpleGraph = simpleGraph;
- this.clipAndFlip = clipAndFlip;
-
- return this;
- }
-
@Override
public Graph<LongValue,NullValue,NullValue> generate() {
int scale = Long.SIZE - Long.numberOfLeadingZeros(vertexCount - 1);
@@ -168,27 +148,16 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
List<BlockInfo<T>> generatorBlocks = randomGenerableFactory
.getRandomGenerables(edgeCount, cyclesPerEdge);
- DataSet<Edge<LongValue,NullValue>> generatedEdges = env
+ DataSet<Edge<LongValue,NullValue>> edges = env
.fromCollection(generatorBlocks)
.name("Random generators")
.rebalance()
.setParallelism(parallelism)
.name("Rebalance")
- .flatMap(new GenerateEdges<T>(vertexCount, scale, A, B, C, noiseEnabled, noise, simpleGraph, clipAndFlip))
+ .flatMap(new GenerateEdges<T>(vertexCount, scale, A, B, C, noiseEnabled, noise))
.setParallelism(parallelism)
.name("RMat graph edges");
- DataSet<Edge<LongValue,NullValue>> edges;
-
- if (simpleGraph) {
- edges = generatedEdges
- .distinct(1, 0)
- .setParallelism(parallelism)
- .name("Distinct");
- } else {
- edges = generatedEdges;
- }
-
// Vertices
DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSet(edges, parallelism);
@@ -216,10 +185,6 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
private final float noise;
- private final boolean simpleGraph;
-
- private final boolean clipAndFlip;
-
// Output
private LongValue source = new LongValue();
@@ -229,7 +194,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
private Edge<LongValue,NullValue> targetToSource = new Edge<>(target, source, NullValue.getInstance());
- public GenerateEdges(long vertexCount, int scale, float A, float B, float C, boolean noiseEnabled, float noise, boolean simpleGraph, boolean clipAndFlip) {
+ public GenerateEdges(long vertexCount, int scale, float A, float B, float C, boolean noiseEnabled, float noise) {
this.vertexCount = vertexCount;
this.scale = scale;
this.A = A;
@@ -238,8 +203,6 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
this.D = 1.0f - A - B - C;
this.noiseEnabled = noiseEnabled;
this.noise = noise;
- this.simpleGraph = simpleGraph;
- this.clipAndFlip = clipAndFlip;
}
@Override
@@ -299,14 +262,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
source.setValue(x);
target.setValue(y);
- if (simpleGraph) {
- if ((clipAndFlip && x > y) || (!clipAndFlip && x != y)) {
- out.collect(sourceToTarget);
- out.collect(targetToSource);
- }
- } else {
- out.collect(sourceToTarget);
- }
+ out.collect(sourceToTarget);
edgesToGenerate--;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index 0f843fa..10b538e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -56,7 +56,8 @@ public class AsmTestBase {
protected Graph<LongValue,NullValue,NullValue> undirectedRMatGraph;
@Before
- public void setup() {
+ public void setup()
+ throws Exception {
env = ExecutionEnvironment.createCollectionsEnvironment();
// the "fish" graph
@@ -92,11 +93,13 @@ public class AsmTestBase {
long rmatVertexCount = 1L << 10;
long rmatEdgeCount = 16 * rmatVertexCount;
- directedRMatGraph = new RMatGraph<>(env, new JDKRandomGeneratorFactory(), rmatVertexCount, rmatEdgeCount)
+ Graph<LongValue,NullValue,NullValue> rmatGraph = new RMatGraph<>(env, new JDKRandomGeneratorFactory(), rmatVertexCount, rmatEdgeCount)
.generate();
- undirectedRMatGraph = new RMatGraph<>(env, new JDKRandomGeneratorFactory(), rmatVertexCount, rmatEdgeCount)
- .setSimpleGraph(true, false)
- .generate();
+ directedRMatGraph = rmatGraph
+ .run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+
+ undirectedRMatGraph = rmatGraph
+ .run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
index 3fcb9dd..ec95fb4 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
@@ -57,10 +57,10 @@ extends AsmTestBase {
@Test
public void testWithRMatGraph()
throws Exception {
- ChecksumHashCode degreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+ ChecksumHashCode degreesChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
.run(new EdgeDegreesPair<LongValue, NullValue, NullValue>()));
- assertEquals(16384, degreeChecksum.getCount());
- assertEquals(0x00001f68dfabd17cL, degreeChecksum.getChecksum());
+ assertEquals(12009, degreesChecksum.getCount());
+ assertEquals(0x00001660b256c74eL, degreesChecksum.getChecksum());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
index 2b22eea..cc0894e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
@@ -57,10 +57,10 @@ extends AsmTestBase {
@Test
public void testWithRMatGraph()
throws Exception {
- ChecksumHashCode sourceDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+ ChecksumHashCode sourceDegreesChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
.run(new EdgeSourceDegrees<LongValue, NullValue, NullValue>()));
- assertEquals(16384, sourceDegreeChecksum.getCount());
- assertEquals(0x00001ec53bd55136L, sourceDegreeChecksum.getChecksum());
+ assertEquals(12009, sourceDegreesChecksum.getCount());
+ assertEquals(0x000015c4731764b0L, sourceDegreesChecksum.getChecksum());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
index 6840dc5..089552e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
@@ -57,10 +57,10 @@ extends AsmTestBase {
@Test
public void testWithRMatGraph()
throws Exception {
- ChecksumHashCode targetDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+ ChecksumHashCode targetDegreesChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
.run(new EdgeTargetDegrees<LongValue, NullValue, NullValue>()));
- assertEquals(16384, targetDegreeChecksum.getCount());
- assertEquals(0x00001f2867ba8b4fL, targetDegreeChecksum.getChecksum());
+ assertEquals(12009, targetDegreesChecksum.getCount());
+ assertEquals(0x000015e65749b923L, targetDegreesChecksum.getChecksum());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
index a0697a2..1577a50 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
@@ -95,10 +95,10 @@ extends AsmTestBase {
@Test
public void testWithRMatGraph()
throws Exception {
- ChecksumHashCode degreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+ ChecksumHashCode degreesChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
.run(new VertexDegrees<LongValue, NullValue, NullValue>()));
- assertEquals(902, degreeChecksum.getCount());
- assertEquals(0x0000015384f40cb6L, degreeChecksum.getChecksum());
+ assertEquals(902, degreesChecksum.getCount());
+ assertEquals(0x000001527b0f9e80L, degreesChecksum.getChecksum());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
index 0fa0fe5..5172594 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
@@ -83,6 +83,6 @@ extends AsmTestBase {
.setIncludeZeroDegreeVertices(true)));
assertEquals(902, inDegreeChecksum.getCount());
- assertEquals(0x0000000000e1e99cL, inDegreeChecksum.getChecksum());
+ assertEquals(0x0000000000e1d885L, inDegreeChecksum.getChecksum());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
index f7f3d48..7e2af7d 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
@@ -83,6 +83,6 @@ extends AsmTestBase {
.setIncludeZeroDegreeVertices(true)));
assertEquals(902, outDegreeChecksum.getCount());
- assertEquals(0x0000000000e1e99cL, outDegreeChecksum.getChecksum());
+ assertEquals(0x0000000000e1d885L, outDegreeChecksum.getChecksum());
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
new file mode 100644
index 0000000..d7eb280
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.flink.graph.asm.simple.directed;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class SimplifyTest {
+
+ protected Graph<IntValue,NullValue,NullValue> graph;
+
+ @Before
+ public void setup() {
+ ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
+
+ Object[][] edges = new Object[][]{
+ new Object[]{0, 0},
+ new Object[]{0, 1},
+ new Object[]{0, 1},
+ new Object[]{0, 2},
+ new Object[]{0, 2},
+ new Object[]{1, 0},
+ new Object[]{2, 2},
+ };
+
+ List<Edge<IntValue, NullValue>> edgeList = new LinkedList<>();
+
+ for (Object[] edge : edges) {
+ edgeList.add(new Edge<>(new IntValue((int) edge[0]), new IntValue((int) edge[1]), NullValue.getInstance()));
+ }
+
+ graph = Graph.fromCollection(edgeList, env);
+ }
+
+ @Test
+ public void test()
+ throws Exception {
+ String expectedResult =
+ "(0,1,(null))\n" +
+ "(0,2,(null))\n" +
+ "(1,0,(null))";
+
+ Graph<IntValue,NullValue,NullValue> simpleGraph = graph
+ .run(new Simplify<IntValue, NullValue, NullValue>());
+
+ TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), expectedResult);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
new file mode 100644
index 0000000..01171bf
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
@@ -0,0 +1,89 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one
+ * * or more contributor license agreements. See the NOTICE file
+ * * distributed with this work for additional information
+ * * regarding copyright ownership. The ASF licenses this file
+ * * to you under the Apache License, Version 2.0 (the
+ * * "License"); you may not use this file except in compliance
+ * * with the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.flink.graph.asm.simple.undirected;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class SimplifyTest {
+
+ protected Graph<IntValue,NullValue,NullValue> graph;
+
+ @Before
+ public void setup() {
+ ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
+
+ Object[][] edges = new Object[][]{
+ new Object[]{0, 0},
+ new Object[]{0, 1},
+ new Object[]{0, 1},
+ new Object[]{0, 2},
+ new Object[]{0, 2},
+ new Object[]{1, 0},
+ new Object[]{2, 2},
+ };
+
+ List<Edge<IntValue, NullValue>> edgeList = new LinkedList<>();
+
+ for (Object[] edge : edges) {
+ edgeList.add(new Edge<>(new IntValue((int) edge[0]), new IntValue((int) edge[1]), NullValue.getInstance()));
+ }
+
+ graph = Graph.fromCollection(edgeList, env);
+ }
+
+ @Test
+ public void testWithFullFlip()
+ throws Exception {
+ String expectedResult =
+ "(0,1,(null))\n" +
+ "(0,2,(null))\n" +
+ "(1,0,(null))\n" +
+ "(2,0,(null))";
+
+ Graph<IntValue,NullValue,NullValue> simpleGraph = graph
+ .run(new Simplify<IntValue, NullValue, NullValue>(false));
+
+ TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), expectedResult);
+ }
+
+ @Test
+ public void testWithClipAndFlip()
+ throws Exception {
+ String expectedResult =
+ "(0,1,(null))\n" +
+ "(1,0,(null))";
+
+ Graph<IntValue,NullValue,NullValue> simpleGraph = graph
+ .run(new Simplify<IntValue, NullValue, NullValue>(true));
+
+ TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), expectedResult);
+ }
+}
http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
index 2241dc9..5f81384 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.Utils.ChecksumHashCode;
import org.apache.flink.api.java.utils.DataSetUtils;
import org.apache.flink.graph.Graph;
import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
import org.apache.flink.graph.generator.RMatGraph;
import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
import org.apache.flink.graph.generator.random.RandomGenerableFactory;
@@ -121,8 +122,8 @@ extends AsmTestBase {
RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
- .setSimpleGraph(true, false)
- .generate();
+ .generate()
+ .run(new Simplify<LongValue, NullValue, NullValue>(false));
DataSet<Result<LongValue>> ji = graph
.run(new JaccardIndex<LongValue, NullValue, NullValue>()