You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/06/07 14:58:13 UTC

[3/3] flink git commit: [FLINK-4013] [gelly] GraphAlgorithms to simplify directed and undirected graphs

[FLINK-4013] [gelly] GraphAlgorithms to simplify directed and undirected graphs

This closes #2067


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7160a681
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7160a681
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7160a681

Branch: refs/heads/master
Commit: 7160a681240deab693aa4d69c24c4a8a63bb58ba
Parents: a611271
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Jun 2 16:01:00 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jun 7 09:03:26 2016 -0400

----------------------------------------------------------------------
 docs/apis/batch/libs/gelly.md                   |  43 ++++++-
 .../apache/flink/graph/examples/Graph500.java   |  14 ++-
 .../flink/graph/examples/JaccardIndex.java      |   5 +-
 .../examples/LocalClusteringCoefficient.java    |   5 +-
 .../flink/graph/examples/TriangleListing.java   |   5 +-
 .../graph/asm/simple/directed/Simplify.java     |  91 ++++++++++++++
 .../graph/asm/simple/undirected/Simplify.java   | 126 +++++++++++++++++++
 .../apache/flink/graph/generator/RMatGraph.java |  52 +-------
 .../org/apache/flink/graph/asm/AsmTestBase.java |  13 +-
 .../annotate/directed/EdgeDegreesPairTest.java  |   6 +-
 .../directed/EdgeSourceDegreesTest.java         |   6 +-
 .../directed/EdgeTargetDegreesTest.java         |   6 +-
 .../annotate/directed/VertexDegreesTest.java    |   6 +-
 .../annotate/directed/VertexInDegreeTest.java   |   2 +-
 .../annotate/directed/VertexOutDegreeTest.java  |   2 +-
 .../graph/asm/simple/directed/SimplifyTest.java |  75 +++++++++++
 .../asm/simple/undirected/SimplifyTest.java     |  89 +++++++++++++
 .../library/similarity/JaccardIndexTest.java    |   5 +-
 18 files changed, 471 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index 1f7f271..7adff04 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -2315,6 +2315,34 @@ Graph<K, VV, EV> filteredGraph = graph
     </tr>
 
     <tr>
+      <td>simple.directed.<br/><strong>Simplify</strong></td>
+      <td>
+        <p>Remove self-loops and duplicate edges from a <a href="#graph-representation">directed graph</a>.</p>
+{% highlight java %}
+graph.run(new Simplify());
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>simple.undirected.<br/><strong>Simplify</strong></td>
+      <td>
+        <p>Add symmetric edges and remove self-loops and duplicate edges from an <a href="#graph-representation">undirected graph</a>.</p>
+{% highlight java %}
+graph.run(new Simplify());
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
       <td>translate.<br/><strong>TranslateGraphIds</strong></td>
       <td>
         <p>Translate vertex and edge IDs using the given <code>TranslateFunction</code>.</p>
@@ -2325,6 +2353,10 @@ graph.run(new TranslateGraphIds(new LongValueToStringValue()));
         <ul>
           <li><p><strong>translator</strong>: implements type or value conversion</p></li>
         </ul>
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
       </td>
     </tr>
 
@@ -2339,6 +2371,10 @@ graph.run(new TranslateVertexValues(new LongValueAddOffset(vertexCount)));
         <ul>
           <li><p><strong>translator</strong>: implements type or value conversion</p></li>
         </ul>
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
       </td>
     </tr>
 
@@ -2353,6 +2389,10 @@ graph.run(new TranslateEdgeValues(new Nullify()));
         <ul>
           <li><p><strong>translator</strong>: implements type or value conversion</p></li>
         </ul>
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
       </td>
     </tr>
   </tbody>
@@ -2855,7 +2895,6 @@ boolean clipAndFlip = false;
 Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
     .setConstants(0.57f, 0.19f, 0.19f)
     .setNoise(true, 0.10f)
-    .setSimpleGraph(true, clipAndFlip)
     .generate();
 {% endhighlight %}
 </div>
@@ -2872,7 +2911,7 @@ val edgeCount = edgeFactor * vertexCount
 
 clipAndFlip = false
 
-val graph = new RMatGraph(env.getJavaEnv, rnd, vertexCount, edgeCount).setConstants(0.57f, 0.19f, 0.19f).setNoise(true, 0.10f).setSimpleGraph(true, clipAndFlip).generate()
+val graph = new RMatGraph(env.getJavaEnv, rnd, vertexCount, edgeCount).setConstants(0.57f, 0.19f, 0.19f).setNoise(true, 0.10f).generate()
 {% endhighlight %}
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
index 0daadc1..73bba2c 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
@@ -26,10 +26,13 @@ import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
 import org.apache.flink.graph.generator.RMatGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
 import org.apache.flink.graph.generator.random.RandomGenerableFactory;
 import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
 
 import java.text.NumberFormat;
 
@@ -69,9 +72,14 @@ public class Graph500 {
 		boolean simplify = parameters.getBoolean("simplify", DEFAULT_SIMPLIFY);
 		boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
 
-		DataSet<Tuple2<LongValue,LongValue>> edges = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-			.setSimpleGraph(simplify, clipAndFlip)
-			.generate()
+		Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+			.generate();
+
+		if (simplify) {
+			graph = graph.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+		}
+
+		DataSet<Tuple2<LongValue,LongValue>> edges = graph
 			.getEdges()
 			.project(0, 1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
index f8707d6..c078d73 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
 import org.apache.flink.graph.asm.translate.LongValueToIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.generator.RMatGraph;
@@ -118,8 +119,8 @@ public class JaccardIndex {
 				boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
 
 				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-					.setSimpleGraph(true, clipAndFlip)
-					.generate();
+					.generate()
+					.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
 
 				if (scale > 32) {
 					ji = graph

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
index 58e7cb6..bed68b2 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
 import org.apache.flink.graph.asm.translate.LongValueToIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.generator.RMatGraph;
@@ -74,8 +75,8 @@ public class LocalClusteringCoefficient {
 		boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
 
 		Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-			.setSimpleGraph(true, clipAndFlip)
-			.generate();
+			.generate()
+			.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
 
 		DataSet cc;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
index 2a9bb76..a20bf20 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
 import org.apache.flink.graph.asm.translate.LongValueToIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.generator.RMatGraph;
@@ -72,8 +73,8 @@ public class TriangleListing {
 		boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
 
 		Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-			.setSimpleGraph(true, clipAndFlip)
-			.generate();
+			.generate()
+			.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
 
 		DataSet tl;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
new file mode 100644
index 0000000..7362a3e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -0,0 +1,91 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.graph.asm.simple.directed;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Remove self-loops and duplicate edges from a directed graph.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
+
+	// Optional configuration
+	private int parallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public Simplify<K, VV, EV> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	@Override
+	public Graph<K, VV, EV> run(Graph<K, VV, EV> input)
+			throws Exception {
+		// Edges
+		DataSet<Edge<K, EV>> edges = input
+			.getEdges()
+			.filter(new RemoveSelfLoops<K, EV>())
+				.setParallelism(parallelism)
+				.name("Remove self-loops")
+			.distinct(0, 1)
+				.setParallelism(parallelism)
+				.name("Remove duplicate edges");
+
+		// Graph
+		return Graph.fromDataSet(input.getVertices(), edges, input.getContext());
+	}
+
+	/**
+	 * Filter out edges where the source and target vertex IDs are equal.
+	 *
+	 * @param <T> ID type
+	 * @param <ET> edge value type
+	 */
+	private static class RemoveSelfLoops<T extends Comparable<T>, ET>
+	implements FilterFunction<Edge<T, ET>> {
+		@Override
+		public boolean filter(Edge<T, ET> value) throws Exception {
+			return (value.f0.compareTo(value.f1) != 0);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
new file mode 100644
index 0000000..13ac470
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -0,0 +1,126 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.graph.asm.simple.undirected;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Add symmetric edges and remove self-loops and duplicate edges from an
+ * undirected graph.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
+
+	// Required configuration
+	private boolean clipAndFlip;
+
+	// Optional configuration
+	private int parallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Simplifies an undirected graph by adding reverse edges and removing
+	 * self-loops and duplicate edges.
+	 *
+	 * When clip-and-flip is set, edges where source < target are removed
+	 * before symmetrizing the graph.
+	 *
+	 * @param clipAndFlip method for generating simple graph
+	 */
+	public Simplify(boolean clipAndFlip) {
+		this.clipAndFlip = clipAndFlip;
+	}
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public Simplify<K, VV, EV> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	@Override
+	public Graph<K, VV, EV> run(Graph<K, VV, EV> input)
+			throws Exception {
+		// Edges
+		DataSet<Edge<K, EV>> edges = input
+			.getEdges()
+			.flatMap(new SymmetrizeAndRemoveSelfLoops<K, EV>(clipAndFlip))
+				.setParallelism(parallelism)
+				.name("Remove self-loops")
+			.distinct(0, 1)
+				.setParallelism(parallelism)
+				.name("Remove duplicate edges");
+
+		// Graph
+		return Graph.fromDataSet(input.getVertices(), edges, input.getContext());
+	}
+
+	/**
+	 * Filter out edges where the source and target vertex IDs are equal and
+	 * for each edge also emit an edge with the vertex IDs flipped.
+	 *
+	 * @param <T> ID type
+	 * @param <ET> edge value type
+	 */
+	private static class SymmetrizeAndRemoveSelfLoops<T extends Comparable<T>, ET>
+	implements FlatMapFunction<Edge<T, ET>, Edge<T, ET>> {
+		private boolean clipAndFlip;
+
+		public SymmetrizeAndRemoveSelfLoops(boolean clipAndFlip) {
+			this.clipAndFlip = clipAndFlip;
+		}
+
+		@Override
+		public void flatMap(Edge<T, ET> value, Collector<Edge<T, ET>> out) throws Exception {
+			int comparison = value.f0.compareTo(value.f1);
+
+			if ((clipAndFlip && comparison > 0) || (!clipAndFlip && comparison != 0)) {
+				out.collect(value);
+
+				T temp = value.f0;
+				value.f0 = value.f1;
+				value.f1 = temp;
+
+				out.collect(value);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
index 246d8bb..8a17b13 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
@@ -69,10 +69,6 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
 	private float noise = DEFAULT_NOISE;
 
-	private boolean simpleGraph = false;
-
-	private boolean clipAndFlip = false;
-
 	/**
 	 * Generate a directed or undirected power-law {@link Graph} using the
 	 * Recursive Matrix (R-Mat) model.
@@ -142,22 +138,6 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 		return this;
 	}
 
-	/**
-	 * When configured for a simple graph duplicate edges and self-loops will
-	 * be removed. The clip-and-flip method removes edges where source < target
-	 * before symmetrizing the graph.
-	 *
-	 * @param simpleGraph whether to generate a simple graph
-	 * @param clipAndFlip method for generating simple graph
-	 * @return this
-	 */
-	public RMatGraph<T> setSimpleGraph(boolean simpleGraph, boolean clipAndFlip) {
-		this.simpleGraph = simpleGraph;
-		this.clipAndFlip = clipAndFlip;
-
-		return this;
-	}
-
 	@Override
 	public Graph<LongValue,NullValue,NullValue> generate() {
 		int scale = Long.SIZE - Long.numberOfLeadingZeros(vertexCount - 1);
@@ -168,27 +148,16 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 		List<BlockInfo<T>> generatorBlocks = randomGenerableFactory
 			.getRandomGenerables(edgeCount, cyclesPerEdge);
 
-		DataSet<Edge<LongValue,NullValue>> generatedEdges = env
+		DataSet<Edge<LongValue,NullValue>> edges = env
 			.fromCollection(generatorBlocks)
 				.name("Random generators")
 			.rebalance()
 				.setParallelism(parallelism)
 				.name("Rebalance")
-			.flatMap(new GenerateEdges<T>(vertexCount, scale, A, B, C, noiseEnabled, noise, simpleGraph, clipAndFlip))
+			.flatMap(new GenerateEdges<T>(vertexCount, scale, A, B, C, noiseEnabled, noise))
 				.setParallelism(parallelism)
 				.name("RMat graph edges");
 
-		DataSet<Edge<LongValue,NullValue>> edges;
-
-		if (simpleGraph) {
-			edges = generatedEdges
-				.distinct(1, 0)
-					.setParallelism(parallelism)
-					.name("Distinct");
-		} else {
-			edges = generatedEdges;
-		}
-
 		// Vertices
 		DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSet(edges, parallelism);
 
@@ -216,10 +185,6 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
 		private final float noise;
 
-		private final boolean simpleGraph;
-
-		private final boolean clipAndFlip;
-
 		// Output
 		private LongValue source = new LongValue();
 
@@ -229,7 +194,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
 		private Edge<LongValue,NullValue> targetToSource = new Edge<>(target, source, NullValue.getInstance());
 
-		public GenerateEdges(long vertexCount, int scale, float A, float B, float C, boolean noiseEnabled, float noise, boolean simpleGraph, boolean clipAndFlip) {
+		public GenerateEdges(long vertexCount, int scale, float A, float B, float C, boolean noiseEnabled, float noise) {
 			this.vertexCount = vertexCount;
 			this.scale = scale;
 			this.A = A;
@@ -238,8 +203,6 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 			this.D = 1.0f - A - B - C;
 			this.noiseEnabled = noiseEnabled;
 			this.noise = noise;
-			this.simpleGraph = simpleGraph;
-			this.clipAndFlip = clipAndFlip;
 		}
 
 		@Override
@@ -299,14 +262,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 					source.setValue(x);
 					target.setValue(y);
 
-					if (simpleGraph) {
-						if ((clipAndFlip && x > y) || (!clipAndFlip && x != y)) {
-							out.collect(sourceToTarget);
-							out.collect(targetToSource);
-						}
-					} else {
-						out.collect(sourceToTarget);
-					}
+					out.collect(sourceToTarget);
 
 					edgesToGenerate--;
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index 0f843fa..10b538e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -56,7 +56,8 @@ public class AsmTestBase {
 	protected Graph<LongValue,NullValue,NullValue> undirectedRMatGraph;
 
 	@Before
-	public void setup() {
+	public void setup()
+			throws Exception {
 		env = ExecutionEnvironment.createCollectionsEnvironment();
 
 		// the "fish" graph
@@ -92,11 +93,13 @@ public class AsmTestBase {
 		long rmatVertexCount = 1L << 10;
 		long rmatEdgeCount = 16 * rmatVertexCount;
 
-		directedRMatGraph = new RMatGraph<>(env, new JDKRandomGeneratorFactory(), rmatVertexCount, rmatEdgeCount)
+		Graph<LongValue,NullValue,NullValue> rmatGraph = new RMatGraph<>(env, new JDKRandomGeneratorFactory(), rmatVertexCount, rmatEdgeCount)
 			.generate();
 
-		undirectedRMatGraph = new RMatGraph<>(env, new JDKRandomGeneratorFactory(), rmatVertexCount, rmatEdgeCount)
-			.setSimpleGraph(true, false)
-			.generate();
+		directedRMatGraph = rmatGraph
+			.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+
+		undirectedRMatGraph = rmatGraph
+			.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
index 3fcb9dd..ec95fb4 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
@@ -57,10 +57,10 @@ extends AsmTestBase {
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		ChecksumHashCode degreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+		ChecksumHashCode degreesChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
 			.run(new EdgeDegreesPair<LongValue, NullValue, NullValue>()));
 
-		assertEquals(16384, degreeChecksum.getCount());
-		assertEquals(0x00001f68dfabd17cL, degreeChecksum.getChecksum());
+		assertEquals(12009, degreesChecksum.getCount());
+		assertEquals(0x00001660b256c74eL, degreesChecksum.getChecksum());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
index 2b22eea..cc0894e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
@@ -57,10 +57,10 @@ extends AsmTestBase {
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		ChecksumHashCode sourceDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+		ChecksumHashCode sourceDegreesChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
 			.run(new EdgeSourceDegrees<LongValue, NullValue, NullValue>()));
 
-		assertEquals(16384, sourceDegreeChecksum.getCount());
-		assertEquals(0x00001ec53bd55136L, sourceDegreeChecksum.getChecksum());
+		assertEquals(12009, sourceDegreesChecksum.getCount());
+		assertEquals(0x000015c4731764b0L, sourceDegreesChecksum.getChecksum());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
index 6840dc5..089552e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
@@ -57,10 +57,10 @@ extends AsmTestBase {
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		ChecksumHashCode targetDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+		ChecksumHashCode targetDegreesChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
 			.run(new EdgeTargetDegrees<LongValue, NullValue, NullValue>()));
 
-		assertEquals(16384, targetDegreeChecksum.getCount());
-		assertEquals(0x00001f2867ba8b4fL, targetDegreeChecksum.getChecksum());
+		assertEquals(12009, targetDegreesChecksum.getCount());
+		assertEquals(0x000015e65749b923L, targetDegreesChecksum.getChecksum());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
index a0697a2..1577a50 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
@@ -95,10 +95,10 @@ extends AsmTestBase {
 	@Test
 	public void testWithRMatGraph()
 	throws Exception {
-		ChecksumHashCode degreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+		ChecksumHashCode degreesChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
 			.run(new VertexDegrees<LongValue, NullValue, NullValue>()));
 
-		assertEquals(902, degreeChecksum.getCount());
-		assertEquals(0x0000015384f40cb6L, degreeChecksum.getChecksum());
+		assertEquals(902, degreesChecksum.getCount());
+		assertEquals(0x000001527b0f9e80L, degreesChecksum.getChecksum());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
index 0fa0fe5..5172594 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
@@ -83,6 +83,6 @@ extends AsmTestBase {
 				.setIncludeZeroDegreeVertices(true)));
 
 		assertEquals(902, inDegreeChecksum.getCount());
-		assertEquals(0x0000000000e1e99cL, inDegreeChecksum.getChecksum());
+		assertEquals(0x0000000000e1d885L, inDegreeChecksum.getChecksum());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
index f7f3d48..7e2af7d 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
@@ -83,6 +83,6 @@ extends AsmTestBase {
 				.setIncludeZeroDegreeVertices(true)));
 
 		assertEquals(902, outDegreeChecksum.getCount());
-		assertEquals(0x0000000000e1e99cL, outDegreeChecksum.getChecksum());
+		assertEquals(0x0000000000e1d885L, outDegreeChecksum.getChecksum());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
new file mode 100644
index 0000000..d7eb280
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
@@ -0,0 +1,75 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.graph.asm.simple.directed;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class SimplifyTest {
+
+	protected Graph<IntValue,NullValue,NullValue> graph;
+
+	@Before
+	public void setup() {
+		ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
+
+		Object[][] edges = new Object[][]{
+			new Object[]{0, 0},
+			new Object[]{0, 1},
+			new Object[]{0, 1},
+			new Object[]{0, 2},
+			new Object[]{0, 2},
+			new Object[]{1, 0},
+			new Object[]{2, 2},
+		};
+
+		List<Edge<IntValue, NullValue>> edgeList = new LinkedList<>();
+
+		for (Object[] edge : edges) {
+			edgeList.add(new Edge<>(new IntValue((int) edge[0]), new IntValue((int) edge[1]), NullValue.getInstance()));
+		}
+
+		graph = Graph.fromCollection(edgeList, env);
+	}
+
+	@Test
+	public void test()
+			throws Exception {
+		String expectedResult =
+			"(0,1,(null))\n" +
+			"(0,2,(null))\n" +
+			"(1,0,(null))";
+
+		Graph<IntValue,NullValue,NullValue> simpleGraph = graph
+			.run(new Simplify<IntValue, NullValue, NullValue>());
+
+		TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), expectedResult);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
new file mode 100644
index 0000000..01171bf
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
@@ -0,0 +1,89 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.graph.asm.simple.undirected;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class SimplifyTest {
+
+	protected Graph<IntValue,NullValue,NullValue> graph;
+
+	@Before
+	public void setup() {
+		ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
+
+		Object[][] edges = new Object[][]{
+			new Object[]{0, 0},
+			new Object[]{0, 1},
+			new Object[]{0, 1},
+			new Object[]{0, 2},
+			new Object[]{0, 2},
+			new Object[]{1, 0},
+			new Object[]{2, 2},
+		};
+
+		List<Edge<IntValue, NullValue>> edgeList = new LinkedList<>();
+
+		for (Object[] edge : edges) {
+			edgeList.add(new Edge<>(new IntValue((int) edge[0]), new IntValue((int) edge[1]), NullValue.getInstance()));
+		}
+
+		graph = Graph.fromCollection(edgeList, env);
+	}
+
+	@Test
+	public void testWithFullFlip()
+			throws Exception {
+		String expectedResult =
+			"(0,1,(null))\n" +
+			"(0,2,(null))\n" +
+			"(1,0,(null))\n" +
+			"(2,0,(null))";
+
+		Graph<IntValue,NullValue,NullValue> simpleGraph = graph
+			.run(new Simplify<IntValue, NullValue, NullValue>(false));
+
+		TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), expectedResult);
+	}
+
+	@Test
+	public void testWithClipAndFlip()
+			throws Exception {
+		String expectedResult =
+			"(0,1,(null))\n" +
+			"(1,0,(null))";
+
+		Graph<IntValue,NullValue,NullValue> simpleGraph = graph
+			.run(new Simplify<IntValue, NullValue, NullValue>(true));
+
+		TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), expectedResult);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
index 2241dc9..5f81384 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.Utils.ChecksumHashCode;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
 import org.apache.flink.graph.generator.RMatGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
 import org.apache.flink.graph.generator.random.RandomGenerableFactory;
@@ -121,8 +122,8 @@ extends AsmTestBase {
 		RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
 
 		Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-			.setSimpleGraph(true, false)
-			.generate();
+			.generate()
+			.run(new Simplify<LongValue, NullValue, NullValue>(false));
 
 		DataSet<Result<LongValue>> ji = graph
 			.run(new JaccardIndex<LongValue, NullValue, NullValue>()