You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/11 13:57:25 UTC

[7/7] flink git commit: [FLINK-7019] [gelly] Rework parallelism in Gelly algorithms and examples

[FLINK-7019] [gelly] Rework parallelism in Gelly algorithms and examples

Flink job parallelism is set with ExecutionConfig#setParallelism or with
-p on the command-line. The Gelly algorithms JaccardIndex, AdamicAdar,
TriangleListing, and ClusteringCoefficient have intermediate operators
which generate output quadratic in the size of input. These algorithms
may need to be run with a high parallelism but doing so for all
operations is wasteful. Thus was introduced "little parallelism".

This can be simplified by moving the parallelism parameter to the new
common base class with the rule-of-thumb to use the algorithm
parallelism for all normal (small output) operators. The asymptotically
large operators will default to the job parallelism, as will the default
algorithm parallelism.

This closes #4282


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0cc2c17
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0cc2c17
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0cc2c17

Branch: refs/heads/master
Commit: d0cc2c178714987ba23998486651791d04a5beb1
Parents: 273223f
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Jun 26 10:21:50 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jul 11 08:51:23 2017 -0400

----------------------------------------------------------------------
 docs/dev/libs/gelly/library_methods.md          | 14 ++---
 .../apache/flink/graph/drivers/AdamicAdar.java  | 10 +---
 .../graph/drivers/ClusteringCoefficient.java    | 20 +++----
 .../apache/flink/graph/drivers/DriverBase.java  |  6 ++
 .../apache/flink/graph/drivers/EdgeList.java    |  3 +-
 .../flink/graph/drivers/GraphMetrics.java       | 12 ++--
 .../org/apache/flink/graph/drivers/HITS.java    |  5 +-
 .../flink/graph/drivers/JaccardIndex.java       |  9 +--
 .../apache/flink/graph/drivers/PageRank.java    |  7 ++-
 .../flink/graph/drivers/TriangleListing.java    | 16 ++----
 .../graph/drivers/input/CirculantGraph.java     |  6 +-
 .../graph/drivers/input/CompleteGraph.java      |  6 +-
 .../flink/graph/drivers/input/CycleGraph.java   |  6 +-
 .../flink/graph/drivers/input/EchoGraph.java    |  6 +-
 .../flink/graph/drivers/input/EmptyGraph.java   |  1 +
 .../graph/drivers/input/GeneratedGraph.java     |  6 ++
 .../flink/graph/drivers/input/GridGraph.java    |  8 +--
 .../graph/drivers/input/HypercubeGraph.java     |  6 +-
 .../flink/graph/drivers/input/PathGraph.java    |  6 +-
 .../flink/graph/drivers/input/RMatGraph.java    | 11 +---
 .../graph/drivers/input/SingletonEdgeGraph.java |  6 +-
 .../flink/graph/drivers/input/StarGraph.java    |  6 +-
 .../apache/flink/graph/GraphAnalyticBase.java   | 25 ++++++++
 .../annotate/directed/EdgeDegreesPair.java      | 35 ------------
 .../annotate/directed/EdgeSourceDegrees.java    | 35 ------------
 .../annotate/directed/EdgeTargetDegrees.java    | 35 ------------
 .../degree/annotate/directed/VertexDegrees.java | 38 +++----------
 .../annotate/directed/VertexInDegree.java       | 41 +++----------
 .../annotate/directed/VertexOutDegree.java      | 41 +++----------
 .../annotate/undirected/EdgeDegreePair.java     | 32 +----------
 .../annotate/undirected/EdgeSourceDegree.java   | 32 +----------
 .../annotate/undirected/EdgeTargetDegree.java   | 32 +----------
 .../annotate/undirected/VertexDegree.java       | 41 +++----------
 .../degree/filter/undirected/MaximumDegree.java | 37 +++---------
 .../graph/asm/simple/directed/Simplify.java     | 38 -------------
 .../graph/asm/simple/undirected/Simplify.java   | 40 +------------
 .../asm/translate/TranslateEdgeValues.java      | 38 +------------
 .../graph/asm/translate/TranslateGraphIds.java  | 40 +------------
 .../asm/translate/TranslateVertexValues.java    | 40 +------------
 .../graph/generator/GraphGeneratorBase.java     |  5 ++
 .../library/clustering/TriangleListingBase.java | 34 +----------
 .../directed/AverageClusteringCoefficient.java  | 19 +------
 .../directed/GlobalClusteringCoefficient.java   | 24 ++------
 .../directed/LocalClusteringCoefficient.java    | 48 ++++------------
 .../clustering/directed/TriadicCensus.java      | 21 +------
 .../clustering/directed/TriangleListing.java    | 10 ++--
 .../AverageClusteringCoefficient.java           | 19 +------
 .../undirected/GlobalClusteringCoefficient.java | 24 ++------
 .../undirected/LocalClusteringCoefficient.java  | 47 ++++-----------
 .../clustering/undirected/TriadicCensus.java    | 24 ++------
 .../clustering/undirected/TriangleListing.java  |  6 +-
 .../flink/graph/library/linkanalysis/HITS.java  | 34 ++---------
 .../graph/library/linkanalysis/PageRank.java    | 34 ++---------
 .../library/metric/directed/EdgeMetrics.java    | 16 ------
 .../library/metric/directed/VertexMetrics.java  | 16 ------
 .../library/metric/undirected/EdgeMetrics.java  | 16 ------
 .../metric/undirected/VertexMetrics.java        | 16 ------
 .../graph/library/similarity/AdamicAdar.java    | 53 ++++-------------
 .../graph/library/similarity/JaccardIndex.java  | 58 ++++++-------------
 .../utils/proxy/GraphAlgorithmWrappingBase.java | 60 ++++++++++++++++++--
 .../proxy/GraphAlgorithmWrappingDataSet.java    |  4 +-
 .../proxy/GraphAlgorithmWrappingGraph.java      |  4 +-
 62 files changed, 293 insertions(+), 1095 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/docs/dev/libs/gelly/library_methods.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/library_methods.md b/docs/dev/libs/gelly/library_methods.md
index 026bea4..de53aaf 100644
--- a/docs/dev/libs/gelly/library_methods.md
+++ b/docs/dev/libs/gelly/library_methods.md
@@ -225,7 +225,7 @@ Directed and undirected variants are provided. The analytics take a simple graph
 containing the total number of vertices and average clustering coefficient of the graph. The graph ID type must be
 `Comparable` and `Copyable`.
 
-* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
+* `setParallelism`: override the parallelism of operators processing small amounts of data
 
 ### Global Clustering Coefficient
 
@@ -244,7 +244,7 @@ Directed and undirected variants are provided. The analytics take a simple graph
 containing the total number of triplets and triangles in the graph. The result class provides a method to compute the
 global clustering coefficient score. The graph ID type must be `Comparable` and `Copyable`.
 
-* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
+* `setParallelism`: override the parallelism of operators processing small amounts of data
 
 ### Local Clustering Coefficient
 
@@ -266,7 +266,7 @@ provides a method to compute the local clustering coefficient score. The graph I
 `Copyable`.
 
 * `setIncludeZeroDegreeVertices`: include results for vertices with a degree of zero
-* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
+* `setParallelism`: override the parallelism of operators processing small amounts of data
 
 ### Triadic Census
 
@@ -286,7 +286,7 @@ Directed and undirected variants are provided. The analytics take a simple graph
 `AnalyticResult` with accessor methods for querying the count of each triad type. The graph ID type must be
 `Comparable` and `Copyable`.
 
-* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
+* `setParallelism`: override the parallelism of operators processing small amounts of data
 
 ### Triangle Listing
 
@@ -306,7 +306,7 @@ Directed and undirected variants are provided. The algorithms take a simple grap
 `TertiaryResult` containing the three triangle vertices and, for the directed algorithm, a bitmask marking each of the
 six potential edges connecting the three vertices. The graph ID type must be `Comparable` and `Copyable`.
 
-* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
+* `setParallelism`: override the parallelism of operators processing small amounts of data
 * `setSortTriangleVertices`: normalize the triangle listing such that for each result (K0, K1, K2) the vertex IDs are sorted K0 < K1 < K2
 
 ## Link Analysis
@@ -424,9 +424,9 @@ See the [Jaccard Index](#jaccard-index) library method for a similar algorithm.
 The algorithm takes a simple undirected graph as input and outputs a `DataSet` of `BinaryResult` containing two vertex
 IDs and the Adamic-Adar similarity score. The graph ID type must be `Copyable`.
 
-* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
 * `setMinimumRatio`: filter out Adamic-Adar scores less than the given ratio times the average score
 * `setMinimumScore`: filter out Adamic-Adar scores less than the given minimum
+* `setParallelism`: override the parallelism of operators processing small amounts of data
 
 ### Jaccard Index
 
@@ -448,8 +448,8 @@ The algorithm takes a simple undirected graph as input and outputs a `DataSet` o
 the number of shared neighbors, and the number of distinct neighbors. The result class provides a method to compute the
 Jaccard Index score. The graph ID type must be `Copyable`.
 
-* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
 * `setMaximumScore`: filter out Jaccard Index scores greater than or equal to the given maximum fraction
 * `setMinimumScore`: filter out Jaccard Index scores less than the given minimum fraction
+* `setParallelism`: override the parallelism of operators processing small amounts of data
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
index e439ccd..5dd8ea1 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
@@ -21,14 +21,11 @@ package org.apache.flink.graph.drivers;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.drivers.parameter.DoubleParameter;
-import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.CopyableValue;
 
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Driver for {@link org.apache.flink.graph.library.similarity.AdamicAdar}.
  */
@@ -43,9 +40,6 @@ extends DriverBase<K, VV, EV> {
 		.setDefaultValue(0.0)
 		.setMinimumValue(0.0, true);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getShortDescription() {
 		return "similarity score weighted by centerpoint degree";
@@ -64,12 +58,10 @@ extends DriverBase<K, VV, EV> {
 
 	@Override
 	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
-		int lp = littleParallelism.getValue().intValue();
-
 		return graph
 			.run(new org.apache.flink.graph.library.similarity.AdamicAdar<K, VV, EV>()
 				.setMinimumRatio(minRatio.getValue().floatValue())
 				.setMinimumScore(minScore.getValue().floatValue())
-				.setLittleParallelism(lp));
+				.setParallelism(parallelism.getValue().intValue()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
index 14e953a..e10208d 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
@@ -23,7 +23,6 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
-import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.CopyableValue;
 
 import org.apache.commons.lang3.text.StrBuilder;
@@ -31,8 +30,6 @@ import org.apache.commons.lang3.text.WordUtils;
 
 import java.io.PrintStream;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Driver for directed and undirected clustering coefficient algorithm and analytics.
  *
@@ -53,9 +50,6 @@ extends DriverBase<K, VV, EV> {
 	private ChoiceParameter order = new ChoiceParameter(this, "order")
 		.addChoices(DIRECTED, UNDIRECTED);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	private GraphAnalytic<K, VV, EV, ? extends PrintableResult> globalClusteringCoefficient;
 
 	private GraphAnalytic<K, VV, EV, ? extends PrintableResult> averageClusteringCoefficient;
@@ -81,37 +75,37 @@ extends DriverBase<K, VV, EV> {
 
 	@Override
 	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
-		int lp = littleParallelism.getValue().intValue();
+		int parallelism = this.parallelism.getValue().intValue();
 
 		switch (order.getValue()) {
 			case DIRECTED:
 				globalClusteringCoefficient = graph
 					.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<K, VV, EV>()
-						.setLittleParallelism(lp));
+						.setParallelism(parallelism));
 
 				averageClusteringCoefficient = graph
 					.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<K, VV, EV>()
-						.setLittleParallelism(lp));
+						.setParallelism(parallelism));
 
 				@SuppressWarnings("unchecked")
 				DataSet<PrintableResult> directedResult = (DataSet<PrintableResult>) (DataSet<?>) graph
 					.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<K, VV, EV>()
-						.setLittleParallelism(lp));
+						.setParallelism(parallelism));
 				return directedResult;
 
 			case UNDIRECTED:
 				globalClusteringCoefficient = graph
 					.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<K, VV, EV>()
-						.setLittleParallelism(lp));
+						.setParallelism(parallelism));
 
 				averageClusteringCoefficient = graph
 					.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<K, VV, EV>()
-						.setLittleParallelism(lp));
+						.setParallelism(parallelism));
 
 				@SuppressWarnings("unchecked")
 				DataSet<PrintableResult> undirectedResult = (DataSet<PrintableResult>) (DataSet<?>) graph
 					.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<K, VV, EV>()
-						.setLittleParallelism(lp));
+						.setParallelism(parallelism));
 				return undirectedResult;
 
 			default:

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/DriverBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/DriverBase.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/DriverBase.java
index 38e4ea84..5b5a684 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/DriverBase.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/DriverBase.java
@@ -18,10 +18,13 @@
 
 package org.apache.flink.graph.drivers;
 
+import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
 
 import java.io.PrintStream;
 
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
 /**
  * Base class for example drivers.
  *
@@ -33,6 +36,9 @@ public abstract class DriverBase<K, VV, EV>
 extends ParameterizedBase
 implements Driver<K, VV, EV> {
 
+	protected LongParameter parallelism = new LongParameter(this, "__parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
+
 	@Override
 	public String getName() {
 		return this.getClass().getSimpleName();

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
index 287e222..563908c 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
@@ -55,7 +55,8 @@ extends DriverBase<K, VV, EV> {
 		if (hasNullValueEdges(edges)) {
 			return edges
 				.map(new EdgeToTuple2Map<K, EV>())
-				.name("Edge to Tuple2");
+				.name("Edge to Tuple2")
+				.setParallelism(parallelism.getValue().intValue());
 		} else {
 			return edges;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
index f4da13c..61ea60d 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
@@ -85,18 +85,22 @@ extends DriverBase<K, VV, EV> {
 		switch (order.getValue()) {
 			case DIRECTED:
 				vertexMetrics = graph
-					.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<K, VV, EV>());
+					.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<K, VV, EV>()
+						.setParallelism(parallelism.getValue().intValue()));
 
 				edgeMetrics = graph
-					.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<K, VV, EV>());
+					.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<K, VV, EV>()
+						.setParallelism(parallelism.getValue().intValue()));
 				break;
 
 			case UNDIRECTED:
 				vertexMetrics = graph
-					.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<K, VV, EV>());
+					.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<K, VV, EV>()
+						.setParallelism(parallelism.getValue().intValue()));
 
 				edgeMetrics = graph
-					.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<K, VV, EV>());
+					.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<K, VV, EV>()
+						.setParallelism(parallelism.getValue().intValue()));
 				break;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
index 1987421..3c83c4e 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
@@ -55,7 +55,8 @@ extends DriverBase<K, VV, EV> {
 	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
 		return graph
 			.run(new org.apache.flink.graph.library.linkanalysis.HITS<K, VV, EV>(
-				iterationConvergence.getValue().iterations,
-				iterationConvergence.getValue().convergenceThreshold));
+					iterationConvergence.getValue().iterations,
+					iterationConvergence.getValue().convergenceThreshold)
+				.setParallelism(parallelism.getValue().intValue()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
index 8f6cfb7..7c1639a 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
@@ -27,8 +27,6 @@ import org.apache.flink.types.CopyableValue;
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Driver for {@link org.apache.flink.graph.library.similarity.JaccardIndex}.
  */
@@ -53,9 +51,6 @@ extends DriverBase<K, VV, EV> {
 
 	private BooleanParameter mirrorResults = new BooleanParameter(this, "mirror_results");
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getShortDescription() {
 		return "similarity score as fraction of common neighbors";
@@ -76,13 +71,11 @@ extends DriverBase<K, VV, EV> {
 
 	@Override
 	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
-		int lp = littleParallelism.getValue().intValue();
-
 		return graph
 			.run(new org.apache.flink.graph.library.similarity.JaccardIndex<K, VV, EV>()
 				.setMinimumScore(minNumerator.getValue().intValue(), minDenominator.getValue().intValue())
 				.setMaximumScore(maxNumerator.getValue().intValue(), maxDenominator.getValue().intValue())
 				.setMirrorResults(mirrorResults.getValue())
-				.setLittleParallelism(lp));
+				.setParallelism(parallelism.getValue().intValue()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
index 224dea8..299aeed 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
@@ -60,8 +60,9 @@ extends DriverBase<K, VV, EV> {
 	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
 		return graph
 			.run(new org.apache.flink.graph.library.linkanalysis.PageRank<K, VV, EV>(
-				dampingFactor.getValue(),
-				iterationConvergence.getValue().iterations,
-				iterationConvergence.getValue().convergenceThreshold));
+					dampingFactor.getValue(),
+					iterationConvergence.getValue().iterations,
+					iterationConvergence.getValue().convergenceThreshold)
+				.setParallelism(parallelism.getValue().intValue()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
index 86a61d5..4a7d230 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
@@ -24,7 +24,6 @@ import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.drivers.parameter.BooleanParameter;
 import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
-import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.CopyableValue;
 
 import org.apache.commons.lang3.text.StrBuilder;
@@ -32,8 +31,6 @@ import org.apache.commons.lang3.text.WordUtils;
 
 import java.io.PrintStream;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Driver for directed and undirected triangle listing algorithm and analytic.
  *
@@ -56,9 +53,6 @@ extends DriverBase<K, VV, EV> {
 
 	private BooleanParameter computeTriadicCensus = new BooleanParameter(this, "triadic_census");
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	private GraphAnalytic<K, VV, EV, ? extends PrintableResult> triadicCensus;
 
 	@Override
@@ -79,35 +73,35 @@ extends DriverBase<K, VV, EV> {
 
 	@Override
 	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
-		int lp = littleParallelism.getValue().intValue();
+		int parallelism = this.parallelism.getValue().intValue();
 
 		switch (order.getValue()) {
 			case DIRECTED:
 				if (computeTriadicCensus.getValue()) {
 					triadicCensus = graph
 						.run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<K, VV, EV>()
-							.setLittleParallelism(lp));
+							.setParallelism(parallelism));
 				}
 
 				@SuppressWarnings("unchecked")
 				DataSet<PrintableResult> directedResult = (DataSet<PrintableResult>) (DataSet<?>) graph
 					.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<K, VV, EV>()
 						.setSortTriangleVertices(sortTriangleVertices.getValue())
-						.setLittleParallelism(lp));
+						.setParallelism(parallelism));
 				return directedResult;
 
 			case UNDIRECTED:
 				if (computeTriadicCensus.getValue()) {
 					triadicCensus = graph
 						.run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<K, VV, EV>()
-							.setLittleParallelism(lp));
+							.setParallelism(parallelism));
 				}
 
 				@SuppressWarnings("unchecked")
 				DataSet<PrintableResult> undirectedResult = (DataSet<PrintableResult>) (DataSet<?>) graph
 					.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<K, VV, EV>()
 						.setSortTriangleVertices(sortTriangleVertices.getValue())
-						.setLittleParallelism(lp));
+						.setParallelism(parallelism));
 				return undirectedResult;
 
 			default:

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
index a9d05a2..f7364ce 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
@@ -32,7 +32,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.generator.CirculantGraph.MINIMUM_VERTEX_COUNT;
 
 /**
@@ -46,9 +45,6 @@ extends GeneratedGraph<LongValue> {
 	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
 		.setMinimumValue(MINIMUM_VERTEX_COUNT);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	private List<OffsetRange> offsetRanges = new ArrayList<>();
 
 	@Override
@@ -118,7 +114,7 @@ extends GeneratedGraph<LongValue> {
 		}
 
 		return graph
-			.setParallelism(littleParallelism.getValue().intValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
index 64bae73..d3a7e49 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
@@ -24,7 +24,6 @@ import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.generator.CompleteGraph.MINIMUM_VERTEX_COUNT;
 
 /**
@@ -36,9 +35,6 @@ extends GeneratedGraph<LongValue> {
 	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
 		.setMinimumValue(MINIMUM_VERTEX_COUNT);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexCount.getValue() + ")";
@@ -52,7 +48,7 @@ extends GeneratedGraph<LongValue> {
 	@Override
 	protected Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) throws Exception {
 		return new org.apache.flink.graph.generator.CompleteGraph(env, vertexCount.getValue())
-			.setParallelism(littleParallelism.getValue().intValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
index d84cfca..632ad77 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
@@ -24,7 +24,6 @@ import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.generator.CycleGraph.MINIMUM_VERTEX_COUNT;
 
 /**
@@ -36,9 +35,6 @@ extends GeneratedGraph<LongValue> {
 	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
 		.setMinimumValue(MINIMUM_VERTEX_COUNT);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexCount + ")";
@@ -52,7 +48,7 @@ extends GeneratedGraph<LongValue> {
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		return new org.apache.flink.graph.generator.CycleGraph(env, vertexCount.getValue())
-			.setParallelism(littleParallelism.getValue().intValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
index 5ca2f2f..1eb0dd9 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
@@ -24,7 +24,6 @@ import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.generator.EchoGraph.MINIMUM_VERTEX_COUNT;
 import static org.apache.flink.graph.generator.EchoGraph.MINIMUM_VERTEX_DEGREE;
 
@@ -40,9 +39,6 @@ extends GeneratedGraph<LongValue> {
 	private LongParameter vertexDegree = new LongParameter(this, "vertex_degree")
 		.setMinimumValue(MINIMUM_VERTEX_DEGREE);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexCount.getValue() + ":" + vertexDegree.getValue() + ")";
@@ -56,7 +52,7 @@ extends GeneratedGraph<LongValue> {
 	@Override
 	protected Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) throws Exception {
 		return new org.apache.flink.graph.generator.EchoGraph(env, vertexCount.getValue(), vertexDegree.getValue())
-			.setParallelism(littleParallelism.getValue().intValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
index 6feb3c8..f54fcba 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
@@ -48,6 +48,7 @@ extends GeneratedGraph<LongValue> {
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		return new org.apache.flink.graph.generator.EmptyGraph(env, vertexCount.getValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java
index a0446ee..f6d8ae9 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java
@@ -26,6 +26,7 @@ import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.asm.translate.translators.LongValueToStringValue;
 import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
 import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.ByteValue;
 import org.apache.flink.types.CharValue;
 import org.apache.flink.types.LongValue;
@@ -34,6 +35,8 @@ import org.apache.flink.types.ShortValue;
 
 import org.apache.commons.lang3.text.WordUtils;
 
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
 /**
  * Base class for generated graphs.
  *
@@ -65,6 +68,9 @@ extends InputBase<K, NullValue, NullValue> {
 		.addChoices(LONG, STRING)
 		.addHiddenChoices(BYTE, NATIVE_BYTE, SHORT, NATIVE_SHORT, CHAR, NATIVE_CHAR, NATIVE_INTEGER, NATIVE_LONG, NATIVE_STRING);
 
+	protected LongParameter parallelism = new LongParameter(this, "__parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
+
 	/**
 	 * The vertex count is verified to be no greater than the capacity of the
 	 * selected data type. All vertices must be counted even if skipped or

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
index 1b6bac1..b92b175 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -32,8 +31,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Generate a {@link org.apache.flink.graph.generator.GridGraph}.
  */
@@ -44,9 +41,6 @@ extends GeneratedGraph<LongValue> {
 
 	private List<Dimension> dimensions = new ArrayList<>();
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getUsage() {
 		return "--" + PREFIX + "0 size:wrap_endpoints [--" + PREFIX + " size:wrap_endpoints [--" + PREFIX + " ...]] "
@@ -105,7 +99,7 @@ extends GeneratedGraph<LongValue> {
 		}
 
 		return graph
-			.setParallelism(littleParallelism.getValue().intValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
index 1be65bd..00b40c7 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
@@ -24,7 +24,6 @@ import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.generator.HypercubeGraph.MINIMUM_DIMENSIONS;
 
 /**
@@ -37,9 +36,6 @@ extends GeneratedGraph<LongValue> {
 		.setMinimumValue(MINIMUM_DIMENSIONS)
 		.setMaximumValue(63);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + dimensions + ")";
@@ -53,7 +49,7 @@ extends GeneratedGraph<LongValue> {
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		return new org.apache.flink.graph.generator.HypercubeGraph(env, dimensions.getValue())
-			.setParallelism(littleParallelism.getValue().intValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
index 7f3a3e5..aa0c18a 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
@@ -24,7 +24,6 @@ import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.generator.PathGraph.MINIMUM_VERTEX_COUNT;
 
 /**
@@ -36,9 +35,6 @@ extends GeneratedGraph<LongValue> {
 	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
 		.setMinimumValue(MINIMUM_VERTEX_COUNT);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexCount + ")";
@@ -52,7 +48,7 @@ extends GeneratedGraph<LongValue> {
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		return new org.apache.flink.graph.generator.PathGraph(env, vertexCount.getValue())
-			.setParallelism(littleParallelism.getValue().intValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
index cce7afa..f24b99f 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
@@ -32,8 +32,6 @@ import org.apache.flink.types.StringValue;
 
 import org.apache.commons.math3.random.JDKRandomGenerator;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Generate an {@code RMatGraph} with {@link IntValue}, {@link LongValue},
  * or {@link StringValue} keys.
@@ -79,9 +77,6 @@ extends GeneratedMultiGraph<LongValue> {
 	private LongParameter seed = new LongParameter(this, "seed")
 		.setDefaultValue(JDKRandomGeneratorFactory.DEFAULT_SEED);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() +
@@ -95,9 +90,7 @@ extends GeneratedMultiGraph<LongValue> {
 
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) throws Exception {
-		int lp = littleParallelism.getValue().intValue();
-
-		RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+		RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(seed.getValue());
 
 		long vertexCount = 1L << scale.getValue();
 		long edgeCount = vertexCount * edgeFactor.getValue();
@@ -106,7 +99,7 @@ extends GeneratedMultiGraph<LongValue> {
 				env, rnd, vertexCount, edgeCount)
 			.setConstants(a.getValue().floatValue(), b.getValue().floatValue(), c.getValue().floatValue())
 			.setNoise(noiseEnabled.getValue(), noise.getValue().floatValue())
-			.setParallelism(lp)
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
index 44da3f3..b0c7463 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
@@ -24,7 +24,6 @@ import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.generator.PathGraph.MINIMUM_VERTEX_COUNT;
 
 /**
@@ -37,9 +36,6 @@ extends GeneratedGraph<LongValue> {
 		.setMinimumValue(MINIMUM_VERTEX_COUNT)
 		.setMaximumValue(1L << 62);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexPairCount + ")";
@@ -53,7 +49,7 @@ extends GeneratedGraph<LongValue> {
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		return new org.apache.flink.graph.generator.SingletonEdgeGraph(env, vertexPairCount.getValue())
-			.setParallelism(littleParallelism.getValue().intValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
index d488b59..cde25ce 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
@@ -24,7 +24,6 @@ import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.generator.StarGraph.MINIMUM_VERTEX_COUNT;
 
 /**
@@ -36,9 +35,6 @@ extends GeneratedGraph<LongValue> {
 	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
 		.setMinimumValue(MINIMUM_VERTEX_COUNT);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexCount + ")";
@@ -52,7 +48,7 @@ extends GeneratedGraph<LongValue> {
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		return new org.apache.flink.graph.generator.StarGraph(env, vertexCount.getValue())
-			.setParallelism(littleParallelism.getValue().intValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalyticBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalyticBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalyticBase.java
index 2e7c5b2..9049268 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalyticBase.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalyticBase.java
@@ -21,6 +21,8 @@ package org.apache.flink.graph;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.util.Preconditions;
 
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
 /**
  * Base class for {@link GraphAnalytic}.
  *
@@ -34,6 +36,8 @@ implements GraphAnalytic<K, VV, EV, T> {
 
 	protected ExecutionEnvironment env;
 
+	protected int parallelism = PARALLELISM_DEFAULT;
+
 	@Override
 	public GraphAnalytic<K, VV, EV, T> run(Graph<K, VV, EV> input)
 			throws Exception {
@@ -41,6 +45,27 @@ implements GraphAnalytic<K, VV, EV, T> {
 		return this;
 	}
 
+	/**
+	 * Set the parallelism for this analytic's operators. This parameter is
+	 * necessary because processing a small amount of data with high operator
+	 * parallelism is slow and wasteful with memory and buffers.
+	 *
+	 * <p>Operator parallelism should be set to this given value unless
+	 * processing asymptotically more data, in which case the default job
+	 * parallelism should be inherited.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public GraphAnalyticBase<K, VV, EV, T> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
 	@Override
 	public T execute()
 			throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
index 685031c..7191bc9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
@@ -27,11 +27,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates edges of a directed graph with the degree, out-degree, and
@@ -44,37 +40,6 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 public class EdgeDegreesPair<K, VV, EV>
 extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, Degrees>>> {
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public EdgeDegreesPair<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!EdgeDegreesPair.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
-
-		EdgeDegreesPair rhs = (EdgeDegreesPair) other;
-
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
-	}
-
 	@Override
 	public DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
index 0299839..30d30fa 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
@@ -26,11 +26,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates edges of a directed graph with the degree, out-degree, and
@@ -43,37 +39,6 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 public class EdgeSourceDegrees<K, VV, EV>
 extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public EdgeSourceDegrees<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!EdgeSourceDegrees.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
-
-		EdgeSourceDegrees rhs = (EdgeSourceDegrees) other;
-
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
-	}
-
 	@Override
 	public DataSet<Edge<K, Tuple2<EV, Degrees>>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
index 7c13b39..57045a1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
@@ -26,11 +26,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates edges of a directed graph with the degree, out-degree, and
@@ -43,37 +39,6 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 public class EdgeTargetDegrees<K, VV, EV>
 extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public EdgeTargetDegrees<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!EdgeTargetDegrees.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
-
-		EdgeTargetDegrees rhs = (EdgeTargetDegrees) other;
-
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
-	}
-
 	@Override
 	public DataSet<Edge<K, Tuple2<EV, Degrees>>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index b0cb7d7..06a7fd2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -39,9 +39,6 @@ import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.ByteValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates vertices of a directed graph with the degree, out-, and in-degree.
@@ -56,8 +53,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> {
 	// Optional configuration
 	private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true);
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * By default only the edge set is processed for the computation of degree.
 	 * When this flag is set an additional join is performed against the vertex
@@ -73,41 +68,24 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> {
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public VertexDegrees<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!VertexDegrees.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		VertexDegrees rhs = (VertexDegrees) other;
 
-		// verify that configurations can be merged
+		return !includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices);
+	}
 
-		if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
-			return false;
-		}
+	@Override
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
-		// merge configurations
+		VertexDegrees rhs = (VertexDegrees) other;
 
 		includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index 94c2667..dc071cf 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -29,9 +29,6 @@ import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates vertices of a directed graph with the in-degree.
@@ -46,8 +43,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 	// Optional configuration
 	private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true);
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * By default only the edge set is processed for the computation of degree.
 	 * When this flag is set an additional join is performed against the vertex
@@ -63,44 +58,24 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public VertexInDegree<K, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!VertexInDegree.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		VertexInDegree rhs = (VertexInDegree) other;
 
-		// verify that configurations can be merged
+		return !includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices);
+	}
 
-		if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
-			return false;
-		}
+	@Override
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
-		// merge configurations
+		VertexInDegree rhs = (VertexInDegree) other;
 
 		includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index 00f2f89..4a4689b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -29,9 +29,6 @@ import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates vertices of a directed graph with the out-degree.
@@ -46,8 +43,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 	// Optional configuration
 	private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true);
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * By default only the edge set is processed for the computation of degree.
 	 * When this flag is set an additional join is performed against the vertex
@@ -63,44 +58,24 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public VertexOutDegree<K, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!VertexOutDegree.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		VertexOutDegree rhs = (VertexOutDegree) other;
 
-		// verify that configurations can be merged
+		return !includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices);
+	}
 
-		if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
-			return false;
-		}
+	@Override
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
-		// merge configurations
+		VertexOutDegree rhs = (VertexOutDegree) other;
 
 		includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
index 6228cac..ff4285f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
@@ -30,9 +30,6 @@ import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates edges of an undirected graph with the degree of both the source
@@ -48,8 +45,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, L
 	// Optional configuration
 	private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false);
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * The degree can be counted from either the edge source or target IDs.
 	 * By default the source IDs are counted. Reducing on target IDs may
@@ -65,36 +60,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, L
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public EdgeDegreePair<K, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
 		EdgeDegreePair rhs = (EdgeDegreePair) other;
 
 		reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
index 01ff7d0..bd8ce3d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
@@ -29,9 +29,6 @@ import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates edges of an undirected graph with degree of the source vertex.
@@ -46,8 +43,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>>
 	// Optional configuration
 	private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false);
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * The degree can be counted from either the edge source or target IDs.
 	 * By default the source IDs are counted. Reducing on target IDs may
@@ -63,36 +58,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>>
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public EdgeSourceDegree<K, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
 		EdgeSourceDegree rhs = (EdgeSourceDegree) other;
 
 		reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
index d3316ea..cb18d2c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
@@ -29,9 +29,6 @@ import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates edges of an undirected graph with degree of the target vertex.
@@ -46,8 +43,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>>
 	// Optional configuration
 	private OptionalBoolean reduceOnSourceId = new OptionalBoolean(false, false);
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * The degree can be counted from either the edge source or target IDs.
 	 * By default the target IDs are counted. Reducing on source IDs may
@@ -63,36 +58,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>>
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public EdgeTargetDegree<K, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
 		EdgeTargetDegree rhs = (EdgeTargetDegree) other;
 
 		reduceOnSourceId.mergeWith(rhs.reduceOnSourceId);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index 626c11b..d2fad18 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -32,9 +32,6 @@ import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates vertices of an undirected graph with the degree.
@@ -51,8 +48,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 
 	private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false);
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * By default only the edge set is processed for the computation of degree.
 	 * When this flag is set an additional join is performed against the vertex
@@ -83,45 +78,25 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public VertexDegree<K, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!VertexDegree.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		VertexDegree rhs = (VertexDegree) other;
 
-		// verify that configurations can be merged
+		return !includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices);
+	}
 
-		if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
-			return false;
-		}
+	@Override
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
-		// merge configurations
+		VertexDegree rhs = (VertexDegree) other;
 
 		includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
 		reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
index b485507..522d39c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
@@ -36,8 +36,6 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Removes vertices from a graph with degree greater than the given maximum.
  * Any edge with with a source or target vertex with degree greater than the
@@ -58,8 +56,6 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 
 	private OptionalBoolean broadcastHighDegreeVertices = new OptionalBoolean(false, false);
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * Filter out vertices with degree greater than the given maximum.
 	 *
@@ -103,42 +99,25 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public MaximumDegree<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!MaximumDegree.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		MaximumDegree rhs = (MaximumDegree) other;
 
-		// verify that configurations can be merged
+		return maximumDegree == rhs.maximumDegree;
+	}
 
-		if (maximumDegree != rhs.maximumDegree) {
-			return false;
-		}
+	@Override
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
-		// merge configurations
+		MaximumDegree rhs = (MaximumDegree) other;
 
 		reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
 		broadcastHighDegreeVertices.mergeWith(rhs.broadcastHighDegreeVertices);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	/*

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
index 0d60cd9..1bab9c6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -22,11 +22,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Remove self-loops and duplicate edges from a directed graph.
@@ -38,40 +34,6 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 public class Simplify<K extends Comparable<K>, VV, EV>
 extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public Simplify<K, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!Simplify.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
-
-		Simplify rhs = (Simplify) other;
-
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
-	}
-
 	@Override
 	public Graph<K, VV, EV> runInternal(Graph<K, VV, EV> input)
 			throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
index dd3a9b3..6f1e282 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -25,9 +25,6 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Add symmetric edges and remove self-loops and duplicate edges from an
@@ -43,9 +40,6 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 	// Required configuration
 	private boolean clipAndFlip;
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * Simplifies an undirected graph by adding reverse edges and removing
 	 * self-loops and duplicate edges.
@@ -59,43 +53,15 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 		this.clipAndFlip = clipAndFlip;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public Simplify<K, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!Simplify.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		Simplify rhs = (Simplify) other;
 
-		// verify that configurations can be merged
-
-		if (clipAndFlip != rhs.clipAndFlip) {
-			return false;
-		}
-
-		// merge configurations
-
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
+		return clipAndFlip == rhs.clipAndFlip;
 	}
 
 	@Override