You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2016/06/07 14:58:11 UTC

[1/3] flink git commit: [FLINK-3980] [core] Remove ExecutionConfig.PARALLELISM_UNKNOWN

Repository: flink
Updated Branches:
  refs/heads/master 9c561fb48 -> 7160a6812


[FLINK-3980] [core] Remove ExecutionConfig.PARALLELISM_UNKNOWN

This closes #2064


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/21454973
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/21454973
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/21454973

Branch: refs/heads/master
Commit: 21454973ed4edfb3e27bc50ed0279f5708d54fa2
Parents: 9c561fb
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu May 26 15:34:29 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jun 7 09:01:02 2016 -0400

----------------------------------------------------------------------
 .../flink/api/common/ExecutionConfig.java       | 20 ++++++--------------
 .../flink/api/common/ExecutionConfigTest.java   | 13 +------------
 .../flink/api/java/operators/DataSink.java      | 16 ++++++++--------
 .../flink/api/java/operators/Operator.java      | 13 +++++--------
 .../flink/api/java/operator/OperatorTest.java   | 13 +------------
 .../annotate/directed/EdgeDegreesPair.java      |  5 +++--
 .../annotate/directed/EdgeSourceDegrees.java    |  5 +++--
 .../annotate/directed/EdgeTargetDegrees.java    |  5 +++--
 .../degree/annotate/directed/VertexDegrees.java |  5 +++--
 .../annotate/directed/VertexInDegree.java       |  9 +++++++--
 .../annotate/directed/VertexOutDegree.java      |  9 +++++++--
 .../annotate/undirected/EdgeDegreePair.java     |  9 +++++++--
 .../annotate/undirected/EdgeSourceDegree.java   |  9 +++++++--
 .../annotate/undirected/EdgeTargetDegree.java   |  9 +++++++--
 .../annotate/undirected/VertexDegree.java       |  9 +++++++--
 .../flink/graph/asm/translate/Translate.java    | 17 ++++-------------
 .../asm/translate/TranslateEdgeValues.java      |  5 ++---
 .../graph/asm/translate/TranslateGraphIds.java  |  5 ++---
 .../asm/translate/TranslateVertexValues.java    |  5 ++---
 .../undirected/LocalClusteringCoefficient.java  |  9 +++++++--
 .../clustering/undirected/TriangleListing.java  |  9 +++++++--
 .../graph/library/similarity/JaccardIndex.java  |  8 ++++++--
 22 files changed, 105 insertions(+), 102 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
----------------------------------------------------------------------
diff --git a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
index 86d3be6..5b69794 100644
--- a/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
+++ b/flink-core/src/main/java/org/apache/flink/api/common/ExecutionConfig.java
@@ -22,6 +22,7 @@ import com.esotericsoftware.kryo.Serializer;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.Public;
 import org.apache.flink.api.common.restartstrategy.RestartStrategies;
+import org.apache.flink.util.Preconditions;
 
 
 import java.io.Serializable;
@@ -73,13 +74,6 @@ public class ExecutionConfig implements Serializable {
 	 */
 	public static final int PARALLELISM_DEFAULT = -1;
 
-	/**
-	 * The flag value indicating an unknown or unset parallelism. This value is
-	 * not a valid parallelism and indicates that the parallelism should remain
-	 * unchanged.
-	 */
-	public static final int PARALLELISM_UNKNOWN = -2;
-
 	private static final long DEFAULT_RESTART_DELAY = 10000L;
 
 	// --------------------------------------------------------------------------------------------
@@ -225,13 +219,11 @@ public class ExecutionConfig implements Serializable {
 	 * @param parallelism The parallelism to use
 	 */
 	public ExecutionConfig setParallelism(int parallelism) {
-		if (parallelism != PARALLELISM_UNKNOWN) {
-			if (parallelism < 1 && parallelism != PARALLELISM_DEFAULT) {
-				throw new IllegalArgumentException(
-					"Parallelism must be at least one, or ExecutionConfig.PARALLELISM_DEFAULT (use system default).");
-			}
-			this.parallelism = parallelism;
-		}
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+			"The parallelism of an operator must be at least 1.");
+
+		this.parallelism = parallelism;
+
 		return this;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
----------------------------------------------------------------------
diff --git a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
index 103e06f..6dd47d8 100644
--- a/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
+++ b/flink-core/src/test/java/org/apache/flink/api/common/ExecutionConfigTest.java
@@ -53,23 +53,12 @@ public class ExecutionConfigTest {
 	public void testConfigurationOfParallelism() {
 		ExecutionConfig config = new ExecutionConfig();
 
-		// verify that PARALLELISM_UNKNOWN does not change initial parallelism
-		int parallelism = config.getParallelism();
-		config.setParallelism(ExecutionConfig.PARALLELISM_UNKNOWN);
-
-		assertEquals(parallelism, config.getParallelism());
-
 		// verify explicit change in parallelism
-		parallelism = 36;
+		int parallelism = 36;
 		config.setParallelism(parallelism);
 
 		assertEquals(parallelism, config.getParallelism());
 
-		// verify that PARALLELISM_UNKNOWN does not change configured parallelism
-		config.setParallelism(ExecutionConfig.PARALLELISM_UNKNOWN);
-
-		assertEquals(parallelism, config.getParallelism());
-
 		// verify that parallelism is reset to default flag value
 		parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 		config.setParallelism(parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
index 2e2d237..8b419d9 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/DataSink.java
@@ -34,6 +34,7 @@ import org.apache.flink.api.common.typeinfo.NothingTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.api.java.DataSet;
+import org.apache.flink.util.Preconditions;
 
 import java.util.Arrays;
 
@@ -264,17 +265,16 @@ public class DataSink<T> {
 	/**
 	 * Sets the parallelism for this data sink.
 	 * The degree must be 1 or more.
-	 * 
-	 * @param parallelism The parallelism for this data sink.
+	 *
+	 * @param parallelism The parallelism for this data sink. A value equal to {@link ExecutionConfig#PARALLELISM_DEFAULT}
+	 *        will use the system default.
 	 * @return This data sink with set parallelism.
 	 */
 	public DataSink<T> setParallelism(int parallelism) {
-		if (parallelism != ExecutionConfig.PARALLELISM_UNKNOWN) {
-			if (parallelism < 1 && parallelism != ExecutionConfig.PARALLELISM_DEFAULT) {
-				throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
-			}
-			this.parallelism = parallelism;
-		}
+		Preconditions.checkArgument(parallelism > 0 || parallelism == ExecutionConfig.PARALLELISM_DEFAULT,
+			"The parallelism of an operator must be at least 1.");
+
+		this.parallelism = parallelism;
 
 		return this;
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
index 197c6d2..323d23e 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/operators/Operator.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.util.Preconditions;
 
 /**
  * Base class of all operators in the Java API.
@@ -89,18 +90,14 @@ public abstract class Operator<OUT, O extends Operator<OUT, O>> extends DataSet<
 	 * The parallelism must be 1 or more.
 	 * 
 	 * @param parallelism The parallelism for this operator. A value equal to {@link ExecutionConfig#PARALLELISM_DEFAULT}
-	 *        will use the system default and a value equal to {@link ExecutionConfig#PARALLELISM_UNKNOWN} will leave
-	 *        the parallelism unchanged.
+	 *        will use the system default.
 	 * @return The operator with set parallelism.
 	 */
 	public O setParallelism(int parallelism) {
-		if (parallelism != ExecutionConfig.PARALLELISM_UNKNOWN) {
-			if (parallelism < 1 && parallelism != ExecutionConfig.PARALLELISM_DEFAULT) {
-				throw new IllegalArgumentException("The parallelism of an operator must be at least 1.");
-			}
+		Preconditions.checkArgument(parallelism > 0 || parallelism == ExecutionConfig.PARALLELISM_DEFAULT,
+			"The parallelism of an operator must be at least 1.");
 
-			this.parallelism = parallelism;
-		}
+		this.parallelism = parallelism;
 
 		@SuppressWarnings("unchecked")
 		O returnType = (O) this;

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
index 4a17ca9..a69ca3c 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/operator/OperatorTest.java
@@ -32,23 +32,12 @@ public class OperatorTest {
 	public void testConfigurationOfParallelism() {
 		Operator operator = new MockOperator();
 
-		// verify that PARALLELISM_UNKNOWN does not change initial parallelism
-		int parallelism = operator.getParallelism();
-		operator.setParallelism(ExecutionConfig.PARALLELISM_UNKNOWN);
-
-		assertEquals(parallelism, operator.getParallelism());
-
 		// verify explicit change in parallelism
-		parallelism = 36;
+		int parallelism = 36;
 		operator.setParallelism(parallelism);
 
 		assertEquals(parallelism, operator.getParallelism());
 
-		// verify that PARALLELISM_UNKNOWN does not change configured parallelism
-		operator.setParallelism(ExecutionConfig.PARALLELISM_UNKNOWN);
-
-		assertEquals(parallelism, operator.getParallelism());
-
 		// verify that parallelism is reset to default flag value
 		parallelism = ExecutionConfig.PARALLELISM_DEFAULT;
 		operator.setParallelism(parallelism);

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
index 5aa5ca4..40af5ce 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.asm.degree.annotate.directed;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -30,6 +29,8 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
 /**
  * Annotates edges of a directed graph with the degree, out-degree, and
  * in-degree of both the source and target vertices.
@@ -42,7 +43,7 @@ public class EdgeDegreesPair<K, VV, EV>
 implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>>> {
 
 	// Optional configuration
-	private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+	private int parallelism = PARALLELISM_DEFAULT;
 
 	/**
 	 * Override the operator parallelism.

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
index 85ba0ed..e08ee56 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.asm.degree.annotate.directed;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -29,6 +28,8 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
 /**
  * Annotates edges of a directed graph with the degree, out-degree, and
  * in-degree of the source vertex.
@@ -41,7 +42,7 @@ public class EdgeSourceDegrees<K, VV, EV>
 implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> {
 
 	// Optional configuration
-	private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+	private int parallelism = PARALLELISM_DEFAULT;
 
 	/**
 	 * Override the operator parallelism.

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
index 6d72e44..5110513 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.asm.degree.annotate.directed;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -29,6 +28,8 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
 
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
 /**
  * Annotates edges of a directed graph with the degree, out-degree, and
  * in-degree of the target vertex.
@@ -41,7 +42,7 @@ public class EdgeTargetDegrees<K, VV, EV>
 implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, Degrees>>>> {
 
 	// Optional configuration
-	private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+	private int parallelism = PARALLELISM_DEFAULT;
 
 	/**
 	 * Override the operator parallelism.

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index aab0eb6..71a0859 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.asm.degree.annotate.directed;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
@@ -39,6 +38,8 @@ import org.apache.flink.types.ByteValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
 
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
 /**
  * Annotates vertices of a directed graph with the degree, out-, and in-degree.
  *
@@ -52,7 +53,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, Degrees>>> {
 	// Optional configuration
 	private boolean includeZeroDegreeVertices = false;
 
-	private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+	private int parallelism = PARALLELISM_DEFAULT;
 
 	/**
 	 * By default only the edge set is processed for the computation of degree.

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index 00acedb..1541abd 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.asm.degree.annotate.directed;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
@@ -27,6 +26,9 @@ import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.Degr
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToTargetId;
 import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates vertices of a directed graph with the in-degree.
@@ -41,7 +43,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 	// Optional configuration
 	private boolean includeZeroDegreeVertices = false;
 
-	private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+	private int parallelism = PARALLELISM_DEFAULT;
 
 	/**
 	 * By default only the edge set is processed for the computation of degree.
@@ -65,6 +67,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 	 * @return this
 	 */
 	public VertexInDegree<K, VV, EV> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
 		this.parallelism = parallelism;
 
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index bcca19d..22c0a67 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.asm.degree.annotate.directed;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
@@ -27,6 +26,9 @@ import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.Degr
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinVertexWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId;
 import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates vertices of a directed graph with the out-degree.
@@ -41,7 +43,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 	// Optional configuration
 	private boolean includeZeroDegreeVertices = false;
 
-	private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+	private int parallelism = PARALLELISM_DEFAULT;
 
 	/**
 	 * By default only the edge set is processed for the computation of degree.
@@ -65,6 +67,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 	 * @return this
 	 */
 	public VertexOutDegree<K, VV, EV> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
 		this.parallelism = parallelism;
 
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
index 8cc2e08..f27ea54 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.asm.degree.annotate.undirected;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -29,6 +28,9 @@ import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
 import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates edges of an undirected graph with the degree of both the source
@@ -44,7 +46,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, LongValue, LongV
 	// Optional configuration
 	protected boolean reduceOnTargetId = false;
 
-	private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+	private int parallelism = PARALLELISM_DEFAULT;
 
 	/**
 	 * The degree can be counted from either the edge source or target IDs.
@@ -68,6 +70,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple3<EV, LongValue, LongV
 	 * @return this
 	 */
 	public EdgeDegreePair<K, VV, EV> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
 		this.parallelism = parallelism;
 
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
index b9b59c5..2bba645 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.asm.degree.annotate.undirected;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -28,6 +27,9 @@ import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
 import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates edges of an undirected graph with degree of the source vertex.
@@ -42,7 +44,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
 	// Optional configuration
 	private boolean reduceOnTargetId = false;
 
-	private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+	private int parallelism = PARALLELISM_DEFAULT;
 
 	/**
 	 * The degree can be counted from either the edge source or target IDs.
@@ -66,6 +68,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
 	 * @return this
 	 */
 	public EdgeSourceDegree<K, VV, EV> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
 		this.parallelism = parallelism;
 
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
index eabfee7..6edaf17 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.asm.degree.annotate.undirected;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.tuple.Tuple2;
@@ -28,6 +27,9 @@ import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
 import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates edges of an undirected graph with degree of the target vertex.
@@ -42,7 +44,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
 	// Optional configuration
 	private boolean reduceOnSourceId = false;
 
-	private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+	private int parallelism = PARALLELISM_DEFAULT;
 
 	/**
 	 * The degree can be counted from either the edge source or target IDs.
@@ -66,6 +68,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Edge<K, Tuple2<EV, LongValue>>>> {
 	 * @return this
 	 */
 	public EdgeTargetDegree<K, VV, EV> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
 		this.parallelism = parallelism;
 
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index 61a5e82..a2c8e03 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.asm.degree.annotate.undirected;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
@@ -30,6 +29,9 @@ import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.Join
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToSourceId;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.MapEdgeToTargetId;
 import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates vertices of an undirected graph with the degree.
@@ -46,7 +48,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 
 	private boolean reduceOnTargetId = false;
 
-	private int parallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+	private int parallelism = PARALLELISM_DEFAULT;
 
 	/**
 	 * By default only the edge set is processed for the computation of degree.
@@ -85,6 +87,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Vertex<K, LongValue>>> {
 	 * @return this
 	 */
 	public VertexDegree<K, VV, EV> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
 		this.parallelism = parallelism;
 
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
index 70c20f8..f7cc601 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/Translate.java
@@ -30,7 +30,6 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
 
 /**
  * Methods for translation of the type or modification of the data of graph
@@ -53,7 +52,7 @@ public class Translate {
 	 * @return translated vertices
 	 */
 	public static <OLD, NEW, VV> DataSet<Vertex<NEW, VV>> translateVertexIds(DataSet<Vertex<OLD, VV>> vertices, TranslateFunction<OLD, NEW> translator) {
-		return translateVertexIds(vertices, translator, PARALLELISM_UNKNOWN);
+		return translateVertexIds(vertices, translator, PARALLELISM_DEFAULT);
 	}
 
 	/**
@@ -71,8 +70,6 @@ public class Translate {
 	public static <OLD, NEW, VV> DataSet<Vertex<NEW, VV>> translateVertexIds(DataSet<Vertex<OLD, VV>> vertices, TranslateFunction<OLD, NEW> translator, int parallelism) {
 		Preconditions.checkNotNull(vertices);
 		Preconditions.checkNotNull(translator);
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
-			"The parallelism must be greater than zero.");
 
 		Class<Vertex<NEW, VV>> vertexClass = (Class<Vertex<NEW, VV>>)(Class<? extends Vertex>) Vertex.class;
 		TypeInformation<OLD> oldType = ((TupleTypeInfo<Vertex<OLD, VV>>) vertices.getType()).getTypeAt(0);
@@ -130,7 +127,7 @@ public class Translate {
 	 * @return translated edges
 	 */
 	public static <OLD, NEW, EV> DataSet<Edge<NEW, EV>> translateEdgeIds(DataSet<Edge<OLD, EV>> edges, TranslateFunction<OLD, NEW> translator) {
-		return translateEdgeIds(edges, translator, PARALLELISM_UNKNOWN);
+		return translateEdgeIds(edges, translator, PARALLELISM_DEFAULT);
 	}
 
 	/**
@@ -148,8 +145,6 @@ public class Translate {
 	public static <OLD, NEW, EV> DataSet<Edge<NEW, EV>> translateEdgeIds(DataSet<Edge<OLD, EV>> edges, TranslateFunction<OLD, NEW> translator, int parallelism) {
 		Preconditions.checkNotNull(edges);
 		Preconditions.checkNotNull(translator);
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
-			"The parallelism must be greater than zero.");
 
 		Class<Edge<NEW, EV>> edgeClass = (Class<Edge<NEW, EV>>)(Class<? extends Edge>) Edge.class;
 		TypeInformation<OLD> oldType = ((TupleTypeInfo<Edge<OLD, EV>>) edges.getType()).getTypeAt(0);
@@ -208,7 +203,7 @@ public class Translate {
 	 * @return translated vertices
 	 */
 	public static <K, OLD, NEW> DataSet<Vertex<K, NEW>> translateVertexValues(DataSet<Vertex<K, OLD>> vertices, TranslateFunction<OLD, NEW> translator) {
-		return translateVertexValues(vertices, translator, PARALLELISM_UNKNOWN);
+		return translateVertexValues(vertices, translator, PARALLELISM_DEFAULT);
 	}
 
 	/**
@@ -226,8 +221,6 @@ public class Translate {
 	public static <K, OLD, NEW> DataSet<Vertex<K, NEW>> translateVertexValues(DataSet<Vertex<K, OLD>> vertices, TranslateFunction<OLD, NEW> translator, int parallelism) {
 		Preconditions.checkNotNull(vertices);
 		Preconditions.checkNotNull(translator);
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
-			"The parallelism must be greater than zero.");
 
 		Class<Vertex<K, NEW>> vertexClass = (Class<Vertex<K, NEW>>)(Class<? extends Vertex>) Vertex.class;
 		TypeInformation<K> idType = ((TupleTypeInfo<Vertex<K, OLD>>) vertices.getType()).getTypeAt(0);
@@ -285,7 +278,7 @@ public class Translate {
 	 * @return translated edges
 	 */
 	public static <K, OLD, NEW> DataSet<Edge<K, NEW>> translateEdgeValues(DataSet<Edge<K, OLD>> edges, TranslateFunction<OLD, NEW> translator) {
-		return translateEdgeValues(edges, translator, PARALLELISM_UNKNOWN);
+		return translateEdgeValues(edges, translator, PARALLELISM_DEFAULT);
 	}
 
 	/**
@@ -303,8 +296,6 @@ public class Translate {
 	public static <K, OLD, NEW> DataSet<Edge<K, NEW>> translateEdgeValues(DataSet<Edge<K, OLD>> edges, TranslateFunction<OLD, NEW> translator, int parallelism) {
 		Preconditions.checkNotNull(edges);
 		Preconditions.checkNotNull(translator);
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
-			"The parallelism must be greater than zero.");
 
 		Class<Edge<K, NEW>> edgeClass = (Class<Edge<K, NEW>>)(Class<? extends Edge>) Edge.class;
 		TypeInformation<K> idType = ((TupleTypeInfo<Edge<K, OLD>>) edges.getType()).getTypeAt(0);

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
index 1023626..47ec077 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
@@ -25,7 +25,6 @@ import org.apache.flink.graph.GraphAlgorithm;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
 import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues;
 
 /**
@@ -43,7 +42,7 @@ implements GraphAlgorithm<K, VV, OLD, Graph<K, VV, NEW>> {
 	private TranslateFunction<OLD,NEW> translator;
 
 	// Optional configuration
-	private int parallelism = PARALLELISM_UNKNOWN;
+	private int parallelism = PARALLELISM_DEFAULT;
 
 	/**
 	 * Translate {@link Edge} values using the given {@link TranslateFunction}.
@@ -63,7 +62,7 @@ implements GraphAlgorithm<K, VV, OLD, Graph<K, VV, NEW>> {
 	 * @return this
 	 */
 	public TranslateEdgeValues<K, VV, OLD, NEW> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
 			"The parallelism must be greater than zero.");
 
 		this.parallelism = parallelism;

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
index 5b0f67f..6a06feb 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
@@ -26,7 +26,6 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
 import static org.apache.flink.graph.asm.translate.Translate.translateEdgeIds;
 import static org.apache.flink.graph.asm.translate.Translate.translateVertexIds;
 
@@ -45,7 +44,7 @@ implements GraphAlgorithm<OLD, VV, EV, Graph<NEW, VV, EV>> {
 	private TranslateFunction<OLD,NEW> translator;
 
 	// Optional configuration
-	private int parallelism = PARALLELISM_UNKNOWN;
+	private int parallelism = PARALLELISM_DEFAULT;
 
 	/**
 	 * Translate {@link Vertex} and {@link Edge} IDs of a {@link Graph} using the given {@link TranslateFunction}
@@ -65,7 +64,7 @@ implements GraphAlgorithm<OLD, VV, EV, Graph<NEW, VV, EV>> {
 	 * @return this
 	 */
 	public TranslateGraphIds<OLD, NEW, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
 			"The parallelism must be greater than zero.");
 
 		this.parallelism = parallelism;

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
index 4572bfe..3d0133a 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
@@ -25,7 +25,6 @@ import org.apache.flink.graph.Vertex;
 import org.apache.flink.util.Preconditions;
 
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_UNKNOWN;
 import static org.apache.flink.graph.asm.translate.Translate.translateVertexValues;
 
 /**
@@ -43,7 +42,7 @@ implements GraphAlgorithm<K, OLD, EV, Graph<K, NEW, EV>> {
 	private TranslateFunction<OLD, NEW> translator;
 
 	// Optional configuration
-	private int parallelism = PARALLELISM_UNKNOWN;
+	private int parallelism = PARALLELISM_DEFAULT;
 
 	/**
 	 * Translate {@link Vertex} values using the given {@link TranslateFunction}.
@@ -63,7 +62,7 @@ implements GraphAlgorithm<K, OLD, EV, Graph<K, NEW, EV>> {
 	 * @return this
 	 */
 	public TranslateVertexValues<K, OLD, NEW, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT || parallelism == PARALLELISM_UNKNOWN,
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
 			"The parallelism must be greater than zero.");
 
 		this.parallelism = parallelism;

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 0a562d5..d1618d1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.library.clustering.undirected;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
 import org.apache.flink.api.common.functions.ReduceFunction;
@@ -35,6 +34,9 @@ import org.apache.flink.graph.utils.Murmur3_32;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * The local clustering coefficient measures the connectedness of each vertex's
@@ -56,7 +58,7 @@ public class LocalClusteringCoefficient<K extends Comparable<K> & CopyableValue<
 implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
 
 	// Optional configuration
-	private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+	private int littleParallelism = PARALLELISM_DEFAULT;
 
 	/**
 	 * Override the parallelism of operators processing small amounts of data.
@@ -65,6 +67,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
 	 * @return this
 	 */
 	public LocalClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
+		Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
 		this.littleParallelism = littleParallelism;
 
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index e0ad30f..1319d02 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.library.clustering.undirected;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.functions.JoinFunction;
@@ -38,11 +37,14 @@ import org.apache.flink.graph.asm.degree.annotate.undirected.EdgeDegreePair;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
 
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
 
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
 /**
  * Generates a listing of distinct triangles from the input graph.
  * <br/>
@@ -65,7 +67,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
 	// Optional configuration
 	private boolean sortTriangleVertices = false;
 
-	private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+	private int littleParallelism = PARALLELISM_DEFAULT;
 
 	/**
 	 * Normalize the triangle listing such that for each result (K0, K1, K2)
@@ -87,6 +89,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Tuple3<K, K, K>>> {
 	 * @return this
 	 */
 	public TriangleListing<K, VV, EV> setLittleParallelism(int littleParallelism) {
+		Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
 		this.littleParallelism = littleParallelism;
 
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/21454973/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index a9ea60e..d1c206e 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.graph.library.similarity;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.functions.FlatMapFunction;
 import org.apache.flink.api.common.functions.GroupReduceFunction;
 import org.apache.flink.api.common.operators.Order;
@@ -42,6 +41,8 @@ import org.apache.flink.util.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
 /**
  * The Jaccard Index measures the similarity between vertex neighborhoods and
  * is computed as the number of shared neighbors divided by the number of
@@ -77,7 +78,7 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
 
 	private int maximumScoreDenominator = 0;
 
-	private int littleParallelism = ExecutionConfig.PARALLELISM_UNKNOWN;
+	private int littleParallelism = PARALLELISM_DEFAULT;
 
 	/**
 	 * Override the default group size for the quadratic expansion of neighbor
@@ -144,6 +145,9 @@ implements GraphAlgorithm<K, VV, EV, DataSet<Result<K>>> {
 	 * @return this
 	 */
 	public JaccardIndex<K, VV, EV> setLittleParallelism(int littleParallelism) {
+		Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
 		this.littleParallelism = littleParallelism;
 
 		return this;


[2/3] flink git commit: [FLINK-3925] [gelly] GraphAlgorithm to filter by maximum degree

Posted by gr...@apache.org.
[FLINK-3925] [gelly] GraphAlgorithm to filter by maximum degree

This closes #2005


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a611271b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a611271b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a611271b

Branch: refs/heads/master
Commit: a611271b3ef7a084ec8e7edc4d4dc241550d7ad8
Parents: 2145497
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed May 18 11:28:24 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jun 7 09:03:21 2016 -0400

----------------------------------------------------------------------
 docs/apis/batch/libs/gelly.md                   |  19 ++
 .../degree/filter/undirected/MaximumDegree.java | 231 +++++++++++++++++++
 .../filter/undirected/MaximumDegreeTest.java    |  71 ++++++
 3 files changed, 321 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a611271b/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index 05fbcb5..1f7f271 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -2296,6 +2296,25 @@ DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> pairDegree = graph
     </tr>
 
     <tr>
+      <td>degree.filter.undirected.<br/><strong>MaximumDegree</strong></td>
+      <td>
+        <p>Filter an <a href="#graph-representation">undirected graph</a> by maximum degree.</p>
+{% highlight java %}
+Graph<K, VV, EV> filteredGraph = graph
+  .run(new MaximumDegree(5000)
+    .setBroadcastHighDegreeVertices(true)
+    .setReduceOnTargetId(true));
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setBroadcastHighDegreeVertices</strong>: join high-degree vertices using a broadcast-hash to reduce data shuffling when removing a relatively small number of high-degree vertices.</p></li>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+          <li><p><strong>setReduceOnTargetId</strong>: the degree can be counted from either the edge source or target IDs. By default the source IDs are counted. Reducing on target IDs may optimize the algorithm if the input edge list is sorted by target ID.</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
       <td>translate.<br/><strong>TranslateGraphIds</strong></td>
       <td>
         <p>Translate vertex and edge IDs using the given <code>TranslateFunction</code>.</p>

http://git-wip-us.apache.org/repos/asf/flink/blob/a611271b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
new file mode 100644
index 0000000..e7d78bb
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.filter.undirected;
+
+import org.apache.flink.api.common.functions.FlatJoinFunction;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.operators.base.JoinOperatorBase.JoinHint;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFields;
+import org.apache.flink.api.java.functions.FunctionAnnotation.ForwardedFieldsFirst;
+import org.apache.flink.api.java.tuple.Tuple1;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.graph.Vertex;
+import org.apache.flink.graph.asm.degree.annotate.undirected.VertexDegree;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Removes vertices from a graph with degree greater than the given maximum.
+ * Any edge with with a source or target vertex with degree greater than the
+ * given maximum is also removed.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class MaximumDegree<K, VV, EV>
+implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
+
+	// Required configuration
+	private long maximumDegree;
+
+	// Optional configuration
+	private boolean reduceOnTargetId = false;
+
+	private boolean broadcastHighDegreeVertices = false;
+
+	private int parallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Filter out vertices with degree greater than the given maximum.
+	 *
+	 * @param maximumDegree maximum degree
+	 */
+	public MaximumDegree(long maximumDegree) {
+		Preconditions.checkArgument(maximumDegree > 0, "Maximum degree must be greater than zero");
+
+		this.maximumDegree = maximumDegree;
+	}
+
+	/**
+	 * The degree can be counted from either the edge source or target IDs.
+	 * By default the source IDs are counted. Reducing on target IDs may
+	 * optimize the algorithm if the input edge list is sorted by target ID.
+	 *
+	 * @param reduceOnTargetId set to {@code true} if the input edge list
+	 *                         is sorted by target ID
+	 * @return this
+	 */
+	public MaximumDegree<K, VV, EV> setReduceOnTargetId(boolean reduceOnTargetId) {
+		this.reduceOnTargetId = reduceOnTargetId;
+
+		return this;
+	}
+
+	/**
+	 * After filtering high-degree vertices this algorithm must perform joins
+	 * on the original graph's vertex set and on both the source and target IDs
+	 * of the edge set. These joins can be performed without shuffling data
+	 * over the network if the high-degree vertices are distributed by a
+	 * broadcast-hash.
+	 *
+	 * @param broadcastHighDegreeVertices set to {@code true} if the high-degree
+	 *                                    vertices should be broadcast when joining
+	 * @return this
+	 */
+	public MaximumDegree<K, VV, EV> setBroadcastHighDegreeVertices(boolean broadcastHighDegreeVertices) {
+		this.broadcastHighDegreeVertices = broadcastHighDegreeVertices;
+
+		return this;
+	}
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public MaximumDegree<K, VV, EV> setParallelism(int parallelism) {
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	/*
+	 * Implementation notes:
+	 *
+	 * The three leftOuterJoin below could be implemented more efficiently
+	 *   as an anti-join when available in Flink.
+	 */
+
+	@Override
+	public Graph<K, VV, EV> run(Graph<K, VV, EV> input)
+			throws Exception {
+		// u, d(u)
+		DataSet<Vertex<K, LongValue>> vertexDegree = input
+			.run(new VertexDegree<K, VV, EV>()
+				.setReduceOnTargetId(reduceOnTargetId)
+				.setParallelism(parallelism));
+
+		// u, d(u) if d(u) > maximumDegree
+		DataSet<Tuple1<K>> highDegreeVertices = vertexDegree
+			.flatMap(new DegreeFilter<K>(maximumDegree))
+				.setParallelism(parallelism)
+				.name("Filter high-degree vertices");
+
+		JoinHint joinHint = broadcastHighDegreeVertices ? JoinHint.BROADCAST_HASH_SECOND : JoinHint.REPARTITION_HASH_SECOND;
+
+		// Vertices
+		DataSet<Vertex<K, VV>> vertices = input
+			.getVertices()
+			.leftOuterJoin(highDegreeVertices, joinHint)
+			.where(0)
+			.equalTo(0)
+			.with(new ProjectVertex<K, VV>())
+				.setParallelism(parallelism)
+				.name("Project low-degree vertices");
+
+		// Edges
+		DataSet<Edge<K, EV>> edges = input
+			.getEdges()
+			.leftOuterJoin(highDegreeVertices, joinHint)
+			.where(reduceOnTargetId ? 1 : 0)
+			.equalTo(0)
+				.with(new ProjectEdge<K, EV>())
+				.setParallelism(parallelism)
+				.name("Project low-degree edges by " + (reduceOnTargetId ? "target" : "source"))
+			.leftOuterJoin(highDegreeVertices, joinHint)
+			.where(reduceOnTargetId ? 0 : 1)
+			.equalTo(0)
+			.with(new ProjectEdge<K, EV>())
+				.setParallelism(parallelism)
+				.name("Project low-degree edges by " + (reduceOnTargetId ? "source" : "target"));
+
+		// Graph
+		return Graph.fromDataSet(vertices, edges, input.getContext());
+	}
+
+	/**
+	 * Emit vertices with degree greater than the given maximum.
+	 *
+	 * @param <K> ID type
+	 */
+	@ForwardedFields("0")
+	private static class DegreeFilter<K>
+	implements FlatMapFunction<Vertex<K, LongValue>, Tuple1<K>> {
+		private long maximumDegree;
+
+		private Tuple1<K> output = new Tuple1<>();
+
+		public DegreeFilter(long maximumDegree) {
+			this.maximumDegree = maximumDegree;
+		}
+
+		@Override
+		public void flatMap(Vertex<K, LongValue> value, Collector<Tuple1<K>> out)
+				throws Exception {
+			if (value.f1.getValue() > maximumDegree) {
+				output.f0 = value.f0;
+				out.collect(output);
+			}
+		}
+	}
+
+	/**
+	 * Project vertex.
+	 *
+	 * @param <T> ID type
+	 * @param <VT> vertex value type
+	 */
+	@ForwardedFieldsFirst("0; 1")
+	private static class ProjectVertex<T, VT>
+	implements FlatJoinFunction<Vertex<T, VT>, Tuple1<T>, Vertex<T, VT>> {
+		@Override
+		public void join(Vertex<T, VT> vertex, Tuple1<T> id, Collector<Vertex<T, VT>> out)
+				throws Exception {
+			if (id == null) {
+				out.collect(vertex);
+			}
+		}
+	}
+
+	/**
+	 * Project edge.
+	 *
+	 * @param <T> ID type
+	 * @param <ET> edge value type
+	 */
+	@ForwardedFieldsFirst("0; 1; 2")
+	private static class ProjectEdge<T, ET>
+	implements FlatJoinFunction<Edge<T, ET>, Tuple1<T>, Edge<T, ET>> {
+		@Override
+		public void join(Edge<T, ET> edge, Tuple1<T> id, Collector<Edge<T, ET>> out)
+				throws Exception {
+			if (id == null) {
+				out.collect(edge);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a611271b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
new file mode 100644
index 0000000..b3a3356
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegreeTest.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.graph.asm.degree.filter.undirected;
+
+import org.apache.flink.api.java.Utils.ChecksumHashCode;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.utils.GraphUtils;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Test;
+
+import static org.junit.Assert.assertEquals;
+
+public class MaximumDegreeTest
+extends AsmTestBase {
+
+	@Test
+	public void testWithSimpleGraph()
+			throws Exception {
+		Graph<IntValue, NullValue, NullValue> graph = undirectedSimpleGraph
+			.run(new MaximumDegree<IntValue, NullValue, NullValue>(3));
+
+		String expectedVerticesResult =
+			"(0,(null))\n" +
+			"(1,(null))\n" +
+			"(2,(null))\n" +
+			"(4,(null))\n" +
+			"(5,(null))";
+
+		TestBaseUtils.compareResultAsText(graph.getVertices().collect(), expectedVerticesResult);
+
+		String expectedEdgesResult =
+			"(0,1,(null))\n" +
+			"(0,2,(null))\n" +
+			"(1,0,(null))\n" +
+			"(1,2,(null))\n" +
+			"(2,0,(null))\n" +
+			"(2,1,(null))";
+
+		TestBaseUtils.compareResultAsText(graph.getEdges().collect(), expectedEdgesResult);
+	}
+
+	@Test
+	public void testWithRMatGraph()
+			throws Exception {
+		ChecksumHashCode checksum = GraphUtils.checksumHashCode(undirectedRMatGraph
+			.run(new MaximumDegree<LongValue, NullValue, NullValue>(16)));
+
+		assertEquals(805, checksum.getCount());
+		assertEquals(0x0000000008028b43L, checksum.getChecksum());
+	}
+}


[3/3] flink git commit: [FLINK-4013] [gelly] GraphAlgorithms to simplify directed and undirected graphs

Posted by gr...@apache.org.
[FLINK-4013] [gelly] GraphAlgorithms to simplify directed and undirected graphs

This closes #2067


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/7160a681
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/7160a681
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/7160a681

Branch: refs/heads/master
Commit: 7160a681240deab693aa4d69c24c4a8a63bb58ba
Parents: a611271
Author: Greg Hogan <co...@greghogan.com>
Authored: Thu Jun 2 16:01:00 2016 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jun 7 09:03:26 2016 -0400

----------------------------------------------------------------------
 docs/apis/batch/libs/gelly.md                   |  43 ++++++-
 .../apache/flink/graph/examples/Graph500.java   |  14 ++-
 .../flink/graph/examples/JaccardIndex.java      |   5 +-
 .../examples/LocalClusteringCoefficient.java    |   5 +-
 .../flink/graph/examples/TriangleListing.java   |   5 +-
 .../graph/asm/simple/directed/Simplify.java     |  91 ++++++++++++++
 .../graph/asm/simple/undirected/Simplify.java   | 126 +++++++++++++++++++
 .../apache/flink/graph/generator/RMatGraph.java |  52 +-------
 .../org/apache/flink/graph/asm/AsmTestBase.java |  13 +-
 .../annotate/directed/EdgeDegreesPairTest.java  |   6 +-
 .../directed/EdgeSourceDegreesTest.java         |   6 +-
 .../directed/EdgeTargetDegreesTest.java         |   6 +-
 .../annotate/directed/VertexDegreesTest.java    |   6 +-
 .../annotate/directed/VertexInDegreeTest.java   |   2 +-
 .../annotate/directed/VertexOutDegreeTest.java  |   2 +-
 .../graph/asm/simple/directed/SimplifyTest.java |  75 +++++++++++
 .../asm/simple/undirected/SimplifyTest.java     |  89 +++++++++++++
 .../library/similarity/JaccardIndexTest.java    |   5 +-
 18 files changed, 471 insertions(+), 80 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/docs/apis/batch/libs/gelly.md
----------------------------------------------------------------------
diff --git a/docs/apis/batch/libs/gelly.md b/docs/apis/batch/libs/gelly.md
index 1f7f271..7adff04 100644
--- a/docs/apis/batch/libs/gelly.md
+++ b/docs/apis/batch/libs/gelly.md
@@ -2315,6 +2315,34 @@ Graph<K, VV, EV> filteredGraph = graph
     </tr>
 
     <tr>
+      <td>simple.directed.<br/><strong>Simplify</strong></td>
+      <td>
+        <p>Remove self-loops and duplicate edges from a <a href="#graph-representation">directed graph</a>.</p>
+{% highlight java %}
+graph.run(new Simplify());
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
+      <td>simple.undirected.<br/><strong>Simplify</strong></td>
+      <td>
+        <p>Add symmetric edges and remove self-loops and duplicate edges from an <a href="#graph-representation">undirected graph</a>.</p>
+{% highlight java %}
+graph.run(new Simplify());
+{% endhighlight %}
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
+      </td>
+    </tr>
+
+    <tr>
       <td>translate.<br/><strong>TranslateGraphIds</strong></td>
       <td>
         <p>Translate vertex and edge IDs using the given <code>TranslateFunction</code>.</p>
@@ -2325,6 +2353,10 @@ graph.run(new TranslateGraphIds(new LongValueToStringValue()));
         <ul>
           <li><p><strong>translator</strong>: implements type or value conversion</p></li>
         </ul>
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
       </td>
     </tr>
 
@@ -2339,6 +2371,10 @@ graph.run(new TranslateVertexValues(new LongValueAddOffset(vertexCount)));
         <ul>
           <li><p><strong>translator</strong>: implements type or value conversion</p></li>
         </ul>
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
       </td>
     </tr>
 
@@ -2353,6 +2389,10 @@ graph.run(new TranslateEdgeValues(new Nullify()));
         <ul>
           <li><p><strong>translator</strong>: implements type or value conversion</p></li>
         </ul>
+        <p>Optional configuration:</p>
+        <ul>
+          <li><p><strong>setParallelism</strong>: override the operator parallelism</p></li>
+        </ul>
       </td>
     </tr>
   </tbody>
@@ -2855,7 +2895,6 @@ boolean clipAndFlip = false;
 Graph<LongValue,NullValue,NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
     .setConstants(0.57f, 0.19f, 0.19f)
     .setNoise(true, 0.10f)
-    .setSimpleGraph(true, clipAndFlip)
     .generate();
 {% endhighlight %}
 </div>
@@ -2872,7 +2911,7 @@ val edgeCount = edgeFactor * vertexCount
 
 clipAndFlip = false
 
-val graph = new RMatGraph(env.getJavaEnv, rnd, vertexCount, edgeCount).setConstants(0.57f, 0.19f, 0.19f).setNoise(true, 0.10f).setSimpleGraph(true, clipAndFlip).generate()
+val graph = new RMatGraph(env.getJavaEnv, rnd, vertexCount, edgeCount).setConstants(0.57f, 0.19f, 0.19f).setNoise(true, 0.10f).generate()
 {% endhighlight %}
 </div>
 </div>

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
index 0daadc1..73bba2c 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/Graph500.java
@@ -26,10 +26,13 @@ import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
 import org.apache.flink.graph.generator.RMatGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
 import org.apache.flink.graph.generator.random.RandomGenerableFactory;
 import org.apache.flink.types.LongValue;
+import org.apache.flink.types.NullValue;
 
 import java.text.NumberFormat;
 
@@ -69,9 +72,14 @@ public class Graph500 {
 		boolean simplify = parameters.getBoolean("simplify", DEFAULT_SIMPLIFY);
 		boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
 
-		DataSet<Tuple2<LongValue,LongValue>> edges = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-			.setSimpleGraph(simplify, clipAndFlip)
-			.generate()
+		Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
+			.generate();
+
+		if (simplify) {
+			graph = graph.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
+		}
+
+		DataSet<Tuple2<LongValue,LongValue>> edges = graph
 			.getEdges()
 			.project(0, 1);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
index f8707d6..c078d73 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/JaccardIndex.java
@@ -28,6 +28,7 @@ import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
 import org.apache.flink.graph.asm.translate.LongValueToIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.generator.RMatGraph;
@@ -118,8 +119,8 @@ public class JaccardIndex {
 				boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
 
 				Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-					.setSimpleGraph(true, clipAndFlip)
-					.generate();
+					.generate()
+					.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
 
 				if (scale > 32) {
 					ji = graph

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
index 58e7cb6..bed68b2 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/LocalClusteringCoefficient.java
@@ -27,6 +27,7 @@ import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
 import org.apache.flink.graph.asm.translate.LongValueToIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.generator.RMatGraph;
@@ -74,8 +75,8 @@ public class LocalClusteringCoefficient {
 		boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
 
 		Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-			.setSimpleGraph(true, clipAndFlip)
-			.generate();
+			.generate()
+			.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
 
 		DataSet cc;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
index 2a9bb76..a20bf20 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/examples/TriangleListing.java
@@ -26,6 +26,7 @@ import org.apache.flink.api.java.io.CsvOutputFormat;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
 import org.apache.flink.graph.asm.translate.LongValueToIntValue;
 import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.generator.RMatGraph;
@@ -72,8 +73,8 @@ public class TriangleListing {
 		boolean clipAndFlip = parameters.getBoolean("clip_and_flip", DEFAULT_CLIP_AND_FLIP);
 
 		Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-			.setSimpleGraph(true, clipAndFlip)
-			.generate();
+			.generate()
+			.run(new Simplify<LongValue, NullValue, NullValue>(clipAndFlip));
 
 		DataSet tl;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
new file mode 100644
index 0000000..7362a3e
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -0,0 +1,91 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.graph.asm.simple.directed;
+
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Remove self-loops and duplicate edges from a directed graph.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
+
+	// Optional configuration
+	private int parallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public Simplify<K, VV, EV> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	@Override
+	public Graph<K, VV, EV> run(Graph<K, VV, EV> input)
+			throws Exception {
+		// Edges
+		DataSet<Edge<K, EV>> edges = input
+			.getEdges()
+			.filter(new RemoveSelfLoops<K, EV>())
+				.setParallelism(parallelism)
+				.name("Remove self-loops")
+			.distinct(0, 1)
+				.setParallelism(parallelism)
+				.name("Remove duplicate edges");
+
+		// Graph
+		return Graph.fromDataSet(input.getVertices(), edges, input.getContext());
+	}
+
+	/**
+	 * Filter out edges where the source and target vertex IDs are equal.
+	 *
+	 * @param <T> ID type
+	 * @param <ET> edge value type
+	 */
+	private static class RemoveSelfLoops<T extends Comparable<T>, ET>
+	implements FilterFunction<Edge<T, ET>> {
+		@Override
+		public boolean filter(Edge<T, ET> value) throws Exception {
+			return (value.f0.compareTo(value.f1) != 0);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
new file mode 100644
index 0000000..13ac470
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -0,0 +1,126 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.graph.asm.simple.undirected;
+
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.types.CopyableValue;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
+/**
+ * Add symmetric edges and remove self-loops and duplicate edges from an
+ * undirected graph.
+ *
+ * @param <K> ID type
+ * @param <VV> vertex value type
+ * @param <EV> edge value type
+ */
+public class Simplify<K extends Comparable<K> & CopyableValue<K>, VV, EV>
+implements GraphAlgorithm<K, VV, EV, Graph<K, VV, EV>> {
+
+	// Required configuration
+	private boolean clipAndFlip;
+
+	// Optional configuration
+	private int parallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Simplifies an undirected graph by adding reverse edges and removing
+	 * self-loops and duplicate edges.
+	 *
+	 * When clip-and-flip is set, edges where source < target are removed
+	 * before symmetrizing the graph.
+	 *
+	 * @param clipAndFlip method for generating simple graph
+	 */
+	public Simplify(boolean clipAndFlip) {
+		this.clipAndFlip = clipAndFlip;
+	}
+
+	/**
+	 * Override the operator parallelism.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public Simplify<K, VV, EV> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
+	@Override
+	public Graph<K, VV, EV> run(Graph<K, VV, EV> input)
+			throws Exception {
+		// Edges
+		DataSet<Edge<K, EV>> edges = input
+			.getEdges()
+			.flatMap(new SymmetrizeAndRemoveSelfLoops<K, EV>(clipAndFlip))
+				.setParallelism(parallelism)
+				.name("Remove self-loops")
+			.distinct(0, 1)
+				.setParallelism(parallelism)
+				.name("Remove duplicate edges");
+
+		// Graph
+		return Graph.fromDataSet(input.getVertices(), edges, input.getContext());
+	}
+
+	/**
+	 * Filter out edges where the source and target vertex IDs are equal and
+	 * for each edge also emit an edge with the vertex IDs flipped.
+	 *
+	 * @param <T> ID type
+	 * @param <ET> edge value type
+	 */
+	private static class SymmetrizeAndRemoveSelfLoops<T extends Comparable<T>, ET>
+	implements FlatMapFunction<Edge<T, ET>, Edge<T, ET>> {
+		private boolean clipAndFlip;
+
+		public SymmetrizeAndRemoveSelfLoops(boolean clipAndFlip) {
+			this.clipAndFlip = clipAndFlip;
+		}
+
+		@Override
+		public void flatMap(Edge<T, ET> value, Collector<Edge<T, ET>> out) throws Exception {
+			int comparison = value.f0.compareTo(value.f1);
+
+			if ((clipAndFlip && comparison > 0) || (!clipAndFlip && comparison != 0)) {
+				out.collect(value);
+
+				T temp = value.f0;
+				value.f0 = value.f1;
+				value.f1 = temp;
+
+				out.collect(value);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
index 246d8bb..8a17b13 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/RMatGraph.java
@@ -69,10 +69,6 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
 	private float noise = DEFAULT_NOISE;
 
-	private boolean simpleGraph = false;
-
-	private boolean clipAndFlip = false;
-
 	/**
 	 * Generate a directed or undirected power-law {@link Graph} using the
 	 * Recursive Matrix (R-Mat) model.
@@ -142,22 +138,6 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 		return this;
 	}
 
-	/**
-	 * When configured for a simple graph duplicate edges and self-loops will
-	 * be removed. The clip-and-flip method removes edges where source < target
-	 * before symmetrizing the graph.
-	 *
-	 * @param simpleGraph whether to generate a simple graph
-	 * @param clipAndFlip method for generating simple graph
-	 * @return this
-	 */
-	public RMatGraph<T> setSimpleGraph(boolean simpleGraph, boolean clipAndFlip) {
-		this.simpleGraph = simpleGraph;
-		this.clipAndFlip = clipAndFlip;
-
-		return this;
-	}
-
 	@Override
 	public Graph<LongValue,NullValue,NullValue> generate() {
 		int scale = Long.SIZE - Long.numberOfLeadingZeros(vertexCount - 1);
@@ -168,27 +148,16 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 		List<BlockInfo<T>> generatorBlocks = randomGenerableFactory
 			.getRandomGenerables(edgeCount, cyclesPerEdge);
 
-		DataSet<Edge<LongValue,NullValue>> generatedEdges = env
+		DataSet<Edge<LongValue,NullValue>> edges = env
 			.fromCollection(generatorBlocks)
 				.name("Random generators")
 			.rebalance()
 				.setParallelism(parallelism)
 				.name("Rebalance")
-			.flatMap(new GenerateEdges<T>(vertexCount, scale, A, B, C, noiseEnabled, noise, simpleGraph, clipAndFlip))
+			.flatMap(new GenerateEdges<T>(vertexCount, scale, A, B, C, noiseEnabled, noise))
 				.setParallelism(parallelism)
 				.name("RMat graph edges");
 
-		DataSet<Edge<LongValue,NullValue>> edges;
-
-		if (simpleGraph) {
-			edges = generatedEdges
-				.distinct(1, 0)
-					.setParallelism(parallelism)
-					.name("Distinct");
-		} else {
-			edges = generatedEdges;
-		}
-
 		// Vertices
 		DataSet<Vertex<LongValue,NullValue>> vertices = GraphGeneratorUtils.vertexSet(edges, parallelism);
 
@@ -216,10 +185,6 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
 		private final float noise;
 
-		private final boolean simpleGraph;
-
-		private final boolean clipAndFlip;
-
 		// Output
 		private LongValue source = new LongValue();
 
@@ -229,7 +194,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 
 		private Edge<LongValue,NullValue> targetToSource = new Edge<>(target, source, NullValue.getInstance());
 
-		public GenerateEdges(long vertexCount, int scale, float A, float B, float C, boolean noiseEnabled, float noise, boolean simpleGraph, boolean clipAndFlip) {
+		public GenerateEdges(long vertexCount, int scale, float A, float B, float C, boolean noiseEnabled, float noise) {
 			this.vertexCount = vertexCount;
 			this.scale = scale;
 			this.A = A;
@@ -238,8 +203,6 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 			this.D = 1.0f - A - B - C;
 			this.noiseEnabled = noiseEnabled;
 			this.noise = noise;
-			this.simpleGraph = simpleGraph;
-			this.clipAndFlip = clipAndFlip;
 		}
 
 		@Override
@@ -299,14 +262,7 @@ extends AbstractGraphGenerator<LongValue, NullValue, NullValue> {
 					source.setValue(x);
 					target.setValue(y);
 
-					if (simpleGraph) {
-						if ((clipAndFlip && x > y) || (!clipAndFlip && x != y)) {
-							out.collect(sourceToTarget);
-							out.collect(targetToSource);
-						}
-					} else {
-						out.collect(sourceToTarget);
-					}
+					out.collect(sourceToTarget);
 
 					edgesToGenerate--;
 				}

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
index 0f843fa..10b538e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/AsmTestBase.java
@@ -56,7 +56,8 @@ public class AsmTestBase {
 	protected Graph<LongValue,NullValue,NullValue> undirectedRMatGraph;
 
 	@Before
-	public void setup() {
+	public void setup()
+			throws Exception {
 		env = ExecutionEnvironment.createCollectionsEnvironment();
 
 		// the "fish" graph
@@ -92,11 +93,13 @@ public class AsmTestBase {
 		long rmatVertexCount = 1L << 10;
 		long rmatEdgeCount = 16 * rmatVertexCount;
 
-		directedRMatGraph = new RMatGraph<>(env, new JDKRandomGeneratorFactory(), rmatVertexCount, rmatEdgeCount)
+		Graph<LongValue,NullValue,NullValue> rmatGraph = new RMatGraph<>(env, new JDKRandomGeneratorFactory(), rmatVertexCount, rmatEdgeCount)
 			.generate();
 
-		undirectedRMatGraph = new RMatGraph<>(env, new JDKRandomGeneratorFactory(), rmatVertexCount, rmatEdgeCount)
-			.setSimpleGraph(true, false)
-			.generate();
+		directedRMatGraph = rmatGraph
+			.run(new org.apache.flink.graph.asm.simple.directed.Simplify<LongValue, NullValue, NullValue>());
+
+		undirectedRMatGraph = rmatGraph
+			.run(new org.apache.flink.graph.asm.simple.undirected.Simplify<LongValue, NullValue, NullValue>(false));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
index 3fcb9dd..ec95fb4 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPairTest.java
@@ -57,10 +57,10 @@ extends AsmTestBase {
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		ChecksumHashCode degreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+		ChecksumHashCode degreesChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
 			.run(new EdgeDegreesPair<LongValue, NullValue, NullValue>()));
 
-		assertEquals(16384, degreeChecksum.getCount());
-		assertEquals(0x00001f68dfabd17cL, degreeChecksum.getChecksum());
+		assertEquals(12009, degreesChecksum.getCount());
+		assertEquals(0x00001660b256c74eL, degreesChecksum.getChecksum());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
index 2b22eea..cc0894e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegreesTest.java
@@ -57,10 +57,10 @@ extends AsmTestBase {
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		ChecksumHashCode sourceDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+		ChecksumHashCode sourceDegreesChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
 			.run(new EdgeSourceDegrees<LongValue, NullValue, NullValue>()));
 
-		assertEquals(16384, sourceDegreeChecksum.getCount());
-		assertEquals(0x00001ec53bd55136L, sourceDegreeChecksum.getChecksum());
+		assertEquals(12009, sourceDegreesChecksum.getCount());
+		assertEquals(0x000015c4731764b0L, sourceDegreesChecksum.getChecksum());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
index 6840dc5..089552e 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegreesTest.java
@@ -57,10 +57,10 @@ extends AsmTestBase {
 	@Test
 	public void testWithRMatGraph()
 			throws Exception {
-		ChecksumHashCode targetDegreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+		ChecksumHashCode targetDegreesChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
 			.run(new EdgeTargetDegrees<LongValue, NullValue, NullValue>()));
 
-		assertEquals(16384, targetDegreeChecksum.getCount());
-		assertEquals(0x00001f2867ba8b4fL, targetDegreeChecksum.getChecksum());
+		assertEquals(12009, targetDegreesChecksum.getCount());
+		assertEquals(0x000015e65749b923L, targetDegreesChecksum.getChecksum());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
index a0697a2..1577a50 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegreesTest.java
@@ -95,10 +95,10 @@ extends AsmTestBase {
 	@Test
 	public void testWithRMatGraph()
 	throws Exception {
-		ChecksumHashCode degreeChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
+		ChecksumHashCode degreesChecksum = DataSetUtils.checksumHashCode(directedRMatGraph
 			.run(new VertexDegrees<LongValue, NullValue, NullValue>()));
 
-		assertEquals(902, degreeChecksum.getCount());
-		assertEquals(0x0000015384f40cb6L, degreeChecksum.getChecksum());
+		assertEquals(902, degreesChecksum.getCount());
+		assertEquals(0x000001527b0f9e80L, degreesChecksum.getChecksum());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
index 0fa0fe5..5172594 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegreeTest.java
@@ -83,6 +83,6 @@ extends AsmTestBase {
 				.setIncludeZeroDegreeVertices(true)));
 
 		assertEquals(902, inDegreeChecksum.getCount());
-		assertEquals(0x0000000000e1e99cL, inDegreeChecksum.getChecksum());
+		assertEquals(0x0000000000e1d885L, inDegreeChecksum.getChecksum());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
index f7f3d48..7e2af7d 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegreeTest.java
@@ -83,6 +83,6 @@ extends AsmTestBase {
 				.setIncludeZeroDegreeVertices(true)));
 
 		assertEquals(902, outDegreeChecksum.getCount());
-		assertEquals(0x0000000000e1e99cL, outDegreeChecksum.getChecksum());
+		assertEquals(0x0000000000e1d885L, outDegreeChecksum.getChecksum());
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
new file mode 100644
index 0000000..d7eb280
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/directed/SimplifyTest.java
@@ -0,0 +1,75 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.graph.asm.simple.directed;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class SimplifyTest {
+
+	protected Graph<IntValue,NullValue,NullValue> graph;
+
+	@Before
+	public void setup() {
+		ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
+
+		Object[][] edges = new Object[][]{
+			new Object[]{0, 0},
+			new Object[]{0, 1},
+			new Object[]{0, 1},
+			new Object[]{0, 2},
+			new Object[]{0, 2},
+			new Object[]{1, 0},
+			new Object[]{2, 2},
+		};
+
+		List<Edge<IntValue, NullValue>> edgeList = new LinkedList<>();
+
+		for (Object[] edge : edges) {
+			edgeList.add(new Edge<>(new IntValue((int) edge[0]), new IntValue((int) edge[1]), NullValue.getInstance()));
+		}
+
+		graph = Graph.fromCollection(edgeList, env);
+	}
+
+	@Test
+	public void test()
+			throws Exception {
+		String expectedResult =
+			"(0,1,(null))\n" +
+			"(0,2,(null))\n" +
+			"(1,0,(null))";
+
+		Graph<IntValue,NullValue,NullValue> simpleGraph = graph
+			.run(new Simplify<IntValue, NullValue, NullValue>());
+
+		TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), expectedResult);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
new file mode 100644
index 0000000..01171bf
--- /dev/null
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/asm/simple/undirected/SimplifyTest.java
@@ -0,0 +1,89 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one
+ *  * or more contributor license agreements.  See the NOTICE file
+ *  * distributed with this work for additional information
+ *  * regarding copyright ownership.  The ASF licenses this file
+ *  * to you under the Apache License, Version 2.0 (the
+ *  * "License"); you may not use this file except in compliance
+ *  * with the License.  You may obtain a copy of the License at
+ *  *
+ *  *     http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.flink.graph.asm.simple.undirected;
+
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.flink.graph.Edge;
+import org.apache.flink.graph.Graph;
+import org.apache.flink.test.util.TestBaseUtils;
+import org.apache.flink.types.IntValue;
+import org.apache.flink.types.NullValue;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.LinkedList;
+import java.util.List;
+
+public class SimplifyTest {
+
+	protected Graph<IntValue,NullValue,NullValue> graph;
+
+	@Before
+	public void setup() {
+		ExecutionEnvironment env = ExecutionEnvironment.createCollectionsEnvironment();
+
+		Object[][] edges = new Object[][]{
+			new Object[]{0, 0},
+			new Object[]{0, 1},
+			new Object[]{0, 1},
+			new Object[]{0, 2},
+			new Object[]{0, 2},
+			new Object[]{1, 0},
+			new Object[]{2, 2},
+		};
+
+		List<Edge<IntValue, NullValue>> edgeList = new LinkedList<>();
+
+		for (Object[] edge : edges) {
+			edgeList.add(new Edge<>(new IntValue((int) edge[0]), new IntValue((int) edge[1]), NullValue.getInstance()));
+		}
+
+		graph = Graph.fromCollection(edgeList, env);
+	}
+
+	@Test
+	public void testWithFullFlip()
+			throws Exception {
+		String expectedResult =
+			"(0,1,(null))\n" +
+			"(0,2,(null))\n" +
+			"(1,0,(null))\n" +
+			"(2,0,(null))";
+
+		Graph<IntValue,NullValue,NullValue> simpleGraph = graph
+			.run(new Simplify<IntValue, NullValue, NullValue>(false));
+
+		TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), expectedResult);
+	}
+
+	@Test
+	public void testWithClipAndFlip()
+			throws Exception {
+		String expectedResult =
+			"(0,1,(null))\n" +
+			"(1,0,(null))";
+
+		Graph<IntValue,NullValue,NullValue> simpleGraph = graph
+			.run(new Simplify<IntValue, NullValue, NullValue>(true));
+
+		TestBaseUtils.compareResultAsText(simpleGraph.getEdges().collect(), expectedResult);
+	}
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/7160a681/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
index 2241dc9..5f81384 100644
--- a/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
+++ b/flink-libraries/flink-gelly/src/test/java/org/apache/flink/graph/library/similarity/JaccardIndexTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.api.java.Utils.ChecksumHashCode;
 import org.apache.flink.api.java.utils.DataSetUtils;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.asm.AsmTestBase;
+import org.apache.flink.graph.asm.simple.undirected.Simplify;
 import org.apache.flink.graph.generator.RMatGraph;
 import org.apache.flink.graph.generator.random.JDKRandomGeneratorFactory;
 import org.apache.flink.graph.generator.random.RandomGenerableFactory;
@@ -121,8 +122,8 @@ extends AsmTestBase {
 		RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
 
 		Graph<LongValue, NullValue, NullValue> graph = new RMatGraph<>(env, rnd, vertexCount, edgeCount)
-			.setSimpleGraph(true, false)
-			.generate();
+			.generate()
+			.run(new Simplify<LongValue, NullValue, NullValue>(false));
 
 		DataSet<Result<LongValue>> ji = graph
 			.run(new JaccardIndex<LongValue, NullValue, NullValue>()