You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by gr...@apache.org on 2017/07/11 13:57:19 UTC

[1/7] flink git commit: [FLINK-6882] [runtime] Activate checkstyle for runtime/registration

Repository: flink
Updated Branches:
  refs/heads/master 4aa2ffcef -> d0cc2c178


[FLINK-6882] [runtime] Activate checkstyle for runtime/registration

This closes #4099


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a68c15f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a68c15f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a68c15f0

Branch: refs/heads/master
Commit: a68c15f0b3c530e7231df8507ad562b268f19f41
Parents: 4aa2ffc
Author: Greg Hogan <co...@greghogan.com>
Authored: Fri Jun 9 16:32:46 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jul 11 07:48:41 2017 -0400

----------------------------------------------------------------------
 flink-runtime/pom.xml                           |  1 -
 .../registration/RegisteredRpcConnection.java   | 31 ++++++++------------
 .../registration/RegistrationResponse.java      |  8 ++---
 .../registration/RetryingRegistration.java      | 20 ++++++-------
 .../RegisteredRpcConnectionTest.java            | 11 +++----
 .../registration/RetryingRegistrationTest.java  | 18 ++++++++----
 .../registration/TestRegistrationGateway.java   |  9 ++++--
 7 files changed, 50 insertions(+), 48 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a68c15f0/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index d97c0e9..8800f74 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -444,7 +444,6 @@ under the License.
 						**/runtime/messages/**,
 						**/runtime/minicluster/**,
 						**/runtime/operators/**,
-						**/runtime/registration/**,
 						**/runtime/resourcemanager/**,
 						**/runtime/rpc/**,
 						**/runtime/state/**,

http://git-wip-us.apache.org/repos/asf/flink/blob/a68c15f0/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
index 8ebbf92..b477546 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java
@@ -19,10 +19,10 @@
 package org.apache.flink.runtime.registration;
 
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.runtime.rpc.RpcGateway;
 import org.apache.flink.runtime.concurrent.AcceptFunction;
 import org.apache.flink.runtime.concurrent.ApplyFunction;
 import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.rpc.RpcGateway;
 
 import org.slf4j.Logger;
 
@@ -46,35 +46,30 @@ import static org.apache.flink.util.Preconditions.checkState;
  */
 public abstract class RegisteredRpcConnection<Gateway extends RpcGateway, Success extends RegistrationResponse.Success> {
 
-	/** the logger for all log messages of this class */
+	/** The logger for all log messages of this class. */
 	protected final Logger log;
 
-	/** the target component leaderID, for example the ResourceManager leaderID */
+	/** The target component leaderID, for example the ResourceManager leaderID. */
 	private final UUID targetLeaderId;
 
-	/** the target component Address, for example the ResourceManager Address */
+	/** The target component Address, for example the ResourceManager Address. */
 	private final String targetAddress;
 
-	/** Execution context to be used to execute the on complete action of the ResourceManagerRegistration */
+	/** Execution context to be used to execute the on complete action of the ResourceManagerRegistration. */
 	private final Executor executor;
 
-	/** the Registration of this RPC connection */
+	/** The Registration of this RPC connection. */
 	private RetryingRegistration<Gateway, Success> pendingRegistration;
 
-	/** the gateway to register, it's null until the registration is completed */
+	/** The gateway to register, it's null until the registration is completed. */
 	private volatile Gateway targetGateway;
 
-	/** flag indicating that the RPC connection is closed */
+	/** Flag indicating that the RPC connection is closed. */
 	private volatile boolean closed;
 
 	// ------------------------------------------------------------------------
 
-	public RegisteredRpcConnection(
-		Logger log,
-		String targetAddress,
-		UUID targetLeaderId,
-		Executor executor)
-	{
+	public RegisteredRpcConnection(Logger log, String targetAddress, UUID targetLeaderId, Executor executor) {
 		this.log = checkNotNull(log);
 		this.targetAddress = checkNotNull(targetAddress);
 		this.targetLeaderId = checkNotNull(targetLeaderId);
@@ -114,22 +109,22 @@ public abstract class RegisteredRpcConnection<Gateway extends RpcGateway, Succes
 	}
 
 	/**
-	 * This method generate a specific Registration, for example TaskExecutor Registration at the ResourceManager
+	 * This method generate a specific Registration, for example TaskExecutor Registration at the ResourceManager.
 	 */
 	protected abstract RetryingRegistration<Gateway, Success> generateRegistration();
 
 	/**
-	 * This method handle the Registration Response
+	 * This method handle the Registration Response.
 	 */
 	protected abstract void onRegistrationSuccess(Success success);
 
 	/**
-	 * This method handle the Registration failure
+	 * This method handle the Registration failure.
 	 */
 	protected abstract void onRegistrationFailure(Throwable failure);
 
 	/**
-	 * close connection
+	 * Close connection.
 	 */
 	public void close() {
 		closed = true;

http://git-wip-us.apache.org/repos/asf/flink/blob/a68c15f0/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationResponse.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationResponse.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationResponse.java
index fefcc78..9628e26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationResponse.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegistrationResponse.java
@@ -28,14 +28,14 @@ public abstract class RegistrationResponse implements Serializable {
 	private static final long serialVersionUID = 1L;
 
 	// ----------------------------------------------------------------------------
-	
+
 	/**
 	 * Base class for a successful registration. Concrete registration implementations
 	 * will typically extend this class to attach more information.
 	 */
 	public static class Success extends RegistrationResponse {
 		private static final long serialVersionUID = 1L;
-		
+
 		@Override
 		public String toString() {
 			return "Registration Successful";
@@ -50,12 +50,12 @@ public abstract class RegistrationResponse implements Serializable {
 	public static final class Decline extends RegistrationResponse {
 		private static final long serialVersionUID = 1L;
 
-		/** the rejection reason */
+		/** The rejection reason. */
 		private final String reason;
 
 		/**
 		 * Creates a new rejection message.
-		 * 
+		 *
 		 * @param reason The reason for the rejection.
 		 */
 		public Decline(String reason) {

http://git-wip-us.apache.org/repos/asf/flink/blob/a68c15f0/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index a470b49..c5c03bd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -42,11 +42,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull;
  * for example registering the TaskExecutor at the ResourceManager.
  * This {@code RetryingRegistration} implements both the initial address resolution
  * and the retries-with-backoff strategy.
- * 
+ *
  * <p>The registration gives access to a future that is completed upon successful registration.
  * The registration can be canceled, for example when the target where it tries to register
  * at looses leader status.
- * 
+ *
  * @param <Gateway> The type of the gateway to connect to.
  * @param <Success> The type of the successful registration responses.
  */
@@ -56,16 +56,16 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
 	//  default configuration values
 	// ------------------------------------------------------------------------
 
-	/** default value for the initial registration timeout (milliseconds) */
+	/** Default value for the initial registration timeout (milliseconds). */
 	private static final long INITIAL_REGISTRATION_TIMEOUT_MILLIS = 100;
 
-	/** default value for the maximum registration timeout, after exponential back-off (milliseconds) */
+	/** Default value for the maximum registration timeout, after exponential back-off (milliseconds). */
 	private static final long MAX_REGISTRATION_TIMEOUT_MILLIS = 30000;
 
-	/** The pause (milliseconds) made after an registration attempt caused an exception (other than timeout) */
+	/** The pause (milliseconds) made after an registration attempt caused an exception (other than timeout). */
 	private static final long ERROR_REGISTRATION_DELAY_MILLIS = 10000;
 
-	/** The pause (milliseconds) made after the registration attempt was refused */
+	/** The pause (milliseconds) made after the registration attempt was refused. */
 	private static final long REFUSED_REGISTRATION_DELAY_MILLIS = 30000;
 
 	// ------------------------------------------------------------------------
@@ -113,7 +113,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
 	public RetryingRegistration(
 			Logger log,
 			RpcService rpcService,
-			String targetName, 
+			String targetName,
 			Class<Gateway> targetType,
 			String targetAddress,
 			UUID leaderId,
@@ -180,7 +180,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
 		try {
 			// trigger resolution of the resource manager address to a callable gateway
 			Future<Gateway> resourceManagerFuture = rpcService.connect(targetAddress, targetType);
-	
+
 			// upon success, start the registration attempts
 			Future<Void> resourceManagerAcceptFuture = resourceManagerFuture.thenAcceptAsync(new AcceptFunction<Gateway>() {
 				@Override
@@ -223,7 +223,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
 		try {
 			log.info("Registration at {} attempt {} (timeout={}ms)", targetName, attempt, timeoutMillis);
 			Future<RegistrationResponse> registrationFuture = invokeRegistration(gateway, leaderId, timeoutMillis);
-	
+
 			// if the registration was successful, let the TaskExecutor know
 			Future<Void> registrationAcceptFuture = registrationFuture.thenAcceptAsync(new AcceptFunction<RegistrationResponse>() {
 				@Override
@@ -249,7 +249,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e
 					}
 				}
 			}, rpcService.getExecutor());
-	
+
 			// upon failure, retry
 			registrationAcceptFuture.exceptionallyAsync(new ApplyFunction<Throwable, Void>() {
 				@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/a68c15f0/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
index 8558205..a454867 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java
@@ -22,16 +22,17 @@ import org.apache.flink.runtime.registration.RetryingRegistrationTest.TestRegist
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.util.TestLogger;
+
 import org.junit.Test;
 import org.slf4j.LoggerFactory;
 
 import java.util.UUID;
 import java.util.concurrent.Executor;
 
-import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Mockito.mock;
@@ -117,7 +118,7 @@ public class RegisteredRpcConnectionTest extends TestLogger {
 		TestRegistrationGateway testGateway = new TestRegistrationGateway(new RetryingRegistrationTest.TestRegistrationSuccess(connectionID));
 		TestingRpcService rpcService = new TestingRpcService();
 
-		try{
+		try {
 			rpcService.registerGateway(testRpcConnectionEndpointAddress, testGateway);
 
 			TestRpcConnection connection = new TestRpcConnection(testRpcConnectionEndpointAddress, leaderId, rpcService.getExecutor(), rpcService);
@@ -148,11 +149,7 @@ public class RegisteredRpcConnectionTest extends TestLogger {
 
 		private String failureMessage;
 
-		public TestRpcConnection(String targetAddress,
-								 UUID targetLeaderId,
-								 Executor executor,
-								 RpcService rpcService)
-		{
+		public TestRpcConnection(String targetAddress, UUID targetLeaderId, Executor executor,  RpcService rpcService) {
 			super(LoggerFactory.getLogger(RegisteredRpcConnectionTest.class), targetAddress, targetLeaderId, executor);
 			this.rpcService = rpcService;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/a68c15f0/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index 6d6bbef..0c2134f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -26,7 +26,6 @@ import org.apache.flink.runtime.rpc.TestingRpcService;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
-
 import org.slf4j.LoggerFactory;
 
 import java.util.UUID;
@@ -36,8 +35,17 @@ import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
-import static org.junit.Assert.*;
-import static org.mockito.Mockito.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+import static org.mockito.Mockito.any;
+import static org.mockito.Mockito.anyLong;
+import static org.mockito.Mockito.anyString;
+import static org.mockito.Mockito.atMost;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
 /**
  * Tests for the generic retrying registration class, validating the failure, retry, and back-off behavior.
@@ -298,7 +306,7 @@ public class RetryingRegistrationTest extends TestLogger {
 	//  test registration
 	// ------------------------------------------------------------------------
 
-	protected static class TestRegistrationSuccess extends RegistrationResponse.Success {
+	static class TestRegistrationSuccess extends RegistrationResponse.Success {
 		private static final long serialVersionUID = 5542698790917150604L;
 
 		private final String correlationId;
@@ -312,7 +320,7 @@ public class RetryingRegistrationTest extends TestLogger {
 		}
 	}
 
-	protected static class TestRetryingRegistration extends RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {
+	static class TestRetryingRegistration extends RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> {
 
 		// we use shorter timeouts here to speed up the tests
 		static final long INITIAL_TIMEOUT = 20;

http://git-wip-us.apache.org/repos/asf/flink/blob/a68c15f0/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
index 2843aeb..1b23fa3 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/TestRegistrationGateway.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.registration;
 
-import akka.dispatch.Futures;
-
 import org.apache.flink.runtime.concurrent.Future;
 import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
 import org.apache.flink.runtime.rpc.TestingGatewayBase;
@@ -29,6 +27,9 @@ import java.util.UUID;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 
+/**
+ * Mock gateway for {@link RegistrationResponse}.
+ */
 public class TestRegistrationGateway extends TestingGatewayBase {
 
 	private final BlockingQueue<RegistrationCall> invocations;
@@ -42,7 +43,6 @@ public class TestRegistrationGateway extends TestingGatewayBase {
 
 		this.invocations = new LinkedBlockingQueue<>();
 		this.responses = responses;
-		
 	}
 
 	// ------------------------------------------------------------------------
@@ -65,6 +65,9 @@ public class TestRegistrationGateway extends TestingGatewayBase {
 
 	// ------------------------------------------------------------------------
 
+	/**
+	 * Invocation parameters.
+	 */
 	public static class RegistrationCall {
 		private final UUID leaderId;
 		private final long timeout;


[2/7] flink git commit: [FLINK-6903] [runtime] Activate checkstyle for runtime/akka

Posted by gr...@apache.org.
[FLINK-6903] [runtime] Activate checkstyle for runtime/akka

This closes #4114


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/b91df160
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/b91df160
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/b91df160

Branch: refs/heads/master
Commit: b91df16071145911a85a3e76a821a7ff6709b34f
Parents: a68c15f
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Jun 12 13:50:07 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jul 11 07:49:35 2017 -0400

----------------------------------------------------------------------
 flink-runtime/pom.xml                            |  1 -
 .../runtime/akka/DefaultQuarantineHandler.java   |  6 ++++--
 .../flink/runtime/akka/FlinkUntypedActor.java    | 17 ++++++++++-------
 .../flink/runtime/akka/QuarantineHandler.java    |  4 ++--
 .../flink/runtime/akka/QuarantineMonitor.java    |  5 +++--
 .../runtime/akka/FlinkUntypedActorTest.java      | 14 +++++++++-----
 .../runtime/akka/QuarantineMonitorTest.java      | 19 ++++++++++++-------
 7 files changed, 40 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/pom.xml
----------------------------------------------------------------------
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 8800f74..5644cfd 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -426,7 +426,6 @@ under the License.
 					<logViolationsToConsole>true</logViolationsToConsole>
 					<failOnViolation>true</failOnViolation>
 					<excludes>
-						**/runtime/akka/**,
 						**/runtime/blob/**,
 						**/runtime/checkpoint/**,
 						**/runtime/client/**,

http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
index 378cb25..708437f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/DefaultQuarantineHandler.java
@@ -18,11 +18,13 @@
 
 package org.apache.flink.runtime.akka;
 
-import akka.actor.ActorSystem;
-import akka.actor.Address;
 import org.apache.flink.api.common.time.Time;
 import org.apache.flink.util.Preconditions;
+
+import akka.actor.ActorSystem;
+import akka.actor.Address;
 import org.slf4j.Logger;
+
 import scala.concurrent.duration.FiniteDuration;
 
 /**

http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
index 05ae501..8078c26 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/FlinkUntypedActor.java
@@ -18,9 +18,10 @@
 
 package org.apache.flink.runtime.akka;
 
-import akka.actor.UntypedActor;
 import org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
 import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+
+import akka.actor.UntypedActor;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,20 +34,22 @@ import java.util.UUID;
  * of type {@link RequiresLeaderSessionID} without being wrapped in a LeaderSessionMessage is
  * detected, then an Exception is thrown.
  *
- * In order to implement the actor behavior, an implementing subclass has to override the method
+ * <p>In order to implement the actor behavior, an implementing subclass has to override the method
  * handleMessage, which defines how messages are processed. Furthermore, the subclass has to provide
  * a leader session ID option which is returned by getLeaderSessionID.
  */
 public abstract class FlinkUntypedActor extends UntypedActor {
 
+	//CHECKSTYLE.OFF: MemberNameCheck - re-enable after JobManager/TaskManager refactoring in FLIP-6?
 	protected final Logger LOG = LoggerFactory.getLogger(getClass());
+	//CHECKSTYLE.ON: MemberNameCheck
 
 	/**
 	 * This method is called by Akka if a new message has arrived for the actor. It logs the
 	 * processing time of the incoming message if the logging level is set to debug. After logging
 	 * the handleLeaderSessionID method is called.
 	 *
-	 * Important: This method cannot be overriden. The actor specific message handling logic is
+	 * <p>Important: This method cannot be overriden. The actor specific message handling logic is
 	 * implemented by the method handleMessage.
 	 *
 	 * @param message Incoming message
@@ -54,14 +57,14 @@ public abstract class FlinkUntypedActor extends UntypedActor {
 	 */
 	@Override
 	public final void onReceive(Object message) throws Exception {
-		if(LOG.isTraceEnabled()) {
+		if (LOG.isTraceEnabled()) {
 			LOG.trace("Received message {} at {} from {}.", message, getSelf().path(), getSender());
 
 			long start = System.nanoTime();
 
 			handleLeaderSessionID(message);
 
-			long duration = (System.nanoTime() - start)/ 1_000_000;
+			long duration = (System.nanoTime() - start) / 1_000_000;
 
 			LOG.trace("Handled message {} in {} ms from {}.", message, duration, getSender());
 		} else {
@@ -124,14 +127,14 @@ public abstract class FlinkUntypedActor extends UntypedActor {
 	 * Returns the current leader session ID associcated with this actor.
 	 * @return
 	 */
-	abstract protected UUID getLeaderSessionID();
+	protected abstract UUID getLeaderSessionID();
 
 	/**
 	 * This method should be called for every outgoing message. It wraps messages which require
 	 * a leader session ID (indicated by {@link RequiresLeaderSessionID}) in a
 	 * {@link LeaderSessionMessage} with the actor's leader session ID.
 	 *
-	 * This method can be overriden to implement a different decoration behavior.
+	 * <p>This method can be overriden to implement a different decoration behavior.
 	 *
 	 * @param message Message to be decorated
 	 * @return The deocrated message

http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
index 21623e8..e6d3820 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineHandler.java
@@ -33,7 +33,7 @@ public interface QuarantineHandler {
 	 *                     actor system
 	 * @param actorSystem which has been quarantined
 	 */
-	void wasQuarantinedBy(final String remoteSystem, final ActorSystem actorSystem);
+	void wasQuarantinedBy(String remoteSystem, ActorSystem actorSystem);
 
 	/**
 	 * Callback when the given actor system has quarantined the given remote actor system.
@@ -42,5 +42,5 @@ public interface QuarantineHandler {
 	 *                     by our actor system
 	 * @param actorSystem which has quarantined the other actor system
 	 */
-	void hasQuarantined(final String remoteSystem, final ActorSystem actorSystem);
+	void hasQuarantined(String remoteSystem, ActorSystem actorSystem);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
index de82f29..c1075eb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/akka/QuarantineMonitor.java
@@ -18,10 +18,11 @@
 
 package org.apache.flink.runtime.akka;
 
+import org.apache.flink.util.Preconditions;
+
 import akka.actor.UntypedActor;
 import akka.remote.AssociationErrorEvent;
 import akka.remote.transport.Transport;
-import org.apache.flink.util.Preconditions;
 import org.slf4j.Logger;
 
 import java.util.regex.Matcher;
@@ -33,7 +34,7 @@ import java.util.regex.Pattern;
  * or quarantine another remote actor system. If the actor detects that the actor system has been
  * quarantined or quarantined another system, then the {@link QuarantineHandler} is called.
  *
- * IMPORTANT: The implementation if highly specific for Akka 2.3.7. With different version the
+ * <p>IMPORTANT: The implementation if highly specific for Akka 2.3.7. With different version the
  * quarantine state might be detected differently.
  */
 public class QuarantineMonitor extends UntypedActor {

http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
index 3ffc68f..00a8475 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/FlinkUntypedActorTest.java
@@ -18,15 +18,16 @@
 
 package org.apache.flink.runtime.akka;
 
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Kill;
 import akka.actor.Props;
 import akka.testkit.JavaTestKit;
 import akka.testkit.TestActorRef;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.runtime.messages.RequiresLeaderSessionID;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -36,6 +37,9 @@ import java.util.UUID;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
 
+/**
+ * Tests for {@link FlinkUntypedActor}.
+ */
 public class FlinkUntypedActorTest {
 
 	private static ActorSystem actorSystem;
@@ -89,7 +93,7 @@ public class FlinkUntypedActorTest {
 
 		TestActorRef<PlainFlinkUntypedActor> actor = null;
 
-		try{
+		try {
 			final Props props = Props.create(PlainFlinkUntypedActor.class, leaderSessionID);
 			actor = TestActorRef.create(actorSystem, props);
 
@@ -113,7 +117,7 @@ public class FlinkUntypedActorTest {
 	}
 
 	private static void stopActor(ActorRef actor) {
-		if(actor != null) {
+		if (actor != null) {
 			actor.tell(Kill.getInstance(), ActorRef.noSender());
 		}
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/b91df160/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
----------------------------------------------------------------------
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
index 97309a4..09e829e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/akka/QuarantineMonitorTest.java
@@ -18,6 +18,13 @@
 
 package org.apache.flink.runtime.akka;
 
+import org.apache.flink.runtime.concurrent.CompletableFuture;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.TestLogger;
+
 import akka.actor.ActorRef;
 import akka.actor.ActorSystem;
 import akka.actor.Address;
@@ -26,12 +33,6 @@ import akka.actor.UntypedActor;
 import akka.dispatch.OnComplete;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
-import org.apache.flink.runtime.concurrent.CompletableFuture;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkCompletableFuture;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
-import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.TestLogger;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -40,12 +41,16 @@ import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
-import scala.concurrent.duration.FiniteDuration;
 
 import java.util.Properties;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
 
+import scala.concurrent.duration.FiniteDuration;
+
+/**
+ * Tests for {@link QuarantineMonitor}.
+ */
 public class QuarantineMonitorTest extends TestLogger {
 
 	private static final Logger LOG = LoggerFactory.getLogger(QuarantineMonitorTest.class);


[5/7] flink git commit: [FLINK-6358] [gelly] Write job details for Gelly examples

Posted by gr...@apache.org.
[FLINK-6358] [gelly] Write job details for Gelly examples

Add an option to write job details to a file in JSON format. Job details
include: job ID, runtime, parameters with values, and accumulators with
values.

This closes #4170


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/273223f0
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/273223f0
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/273223f0

Branch: refs/heads/master
Commit: 273223f0143f4af2ac8416af5322e384ec02ab1f
Parents: be4853d
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Jun 21 10:25:57 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jul 11 08:51:22 2017 -0400

----------------------------------------------------------------------
 .../java/org/apache/flink/graph/Runner.java     | 97 +++++++++++++++++++-
 1 file changed, 92 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/273223f0/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
index af2a11c..d0e6a92 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/Runner.java
@@ -19,6 +19,7 @@
 package org.apache.flink.graph;
 
 import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobExecutionResult;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.utils.ParameterTool;
@@ -48,14 +49,24 @@ import org.apache.flink.graph.drivers.input.StarGraph;
 import org.apache.flink.graph.drivers.output.Hash;
 import org.apache.flink.graph.drivers.output.Output;
 import org.apache.flink.graph.drivers.output.Print;
+import org.apache.flink.graph.drivers.parameter.BooleanParameter;
 import org.apache.flink.graph.drivers.parameter.Parameterized;
+import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
+import org.apache.flink.graph.drivers.parameter.StringParameter;
+import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.util.InstantiationUtil;
 
+import com.fasterxml.jackson.core.JsonEncoding;
+import com.fasterxml.jackson.core.JsonFactory;
+import com.fasterxml.jackson.core.JsonGenerator;
 import org.apache.commons.lang3.text.StrBuilder;
 
+import java.io.File;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 
 /**
  * This default main class executes Flink drivers.
@@ -70,7 +81,8 @@ import java.util.List;
  * <p>Algorithms must explicitly support each type of output via implementation of
  * interfaces. This is scalable as the number of outputs is small and finite.
  */
-public class Runner {
+public class Runner
+extends ParameterizedBase {
 
 	private static final String INPUT = "input";
 
@@ -108,6 +120,27 @@ public class Runner {
 		.addClass(Hash.class)
 		.addClass(Print.class);
 
+	private final ParameterTool parameters;
+
+	private final BooleanParameter disableObjectReuse = new BooleanParameter(this, "__disable_object_reuse");
+
+	private final StringParameter jobDetailsPath = new StringParameter(this, "__job_details_path")
+		.setDefaultValue(null);
+
+	/**
+	 * Create an algorithm runner from the given arguments.
+	 *
+	 * @param args command-line arguments
+	 */
+	public Runner(String[] args) {
+		parameters = ParameterTool.fromArgs(args);
+	}
+
+	@Override
+	public String getName() {
+		return this.getClass().getSimpleName();
+	}
+
 	/**
 	 * List available algorithms. This is displayed to the user when no valid
 	 * algorithm is given in the program parameterization.
@@ -192,21 +225,26 @@ public class Runner {
 			.toString();
 	}
 
-	public static void main(String[] args) throws Exception {
+	public void run() throws Exception {
 		// Set up the execution environment
 		final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
 		ExecutionConfig config = env.getConfig();
 
 		// should not have any non-Flink data types
-		config.disableAutoTypeRegistration();
 		config.disableForceAvro();
 		config.disableForceKryo();
 
-		ParameterTool parameters = ParameterTool.fromArgs(args);
 		config.setGlobalJobParameters(parameters);
 
+		// configure local parameters and throw proper exception on error
+		try {
+			this.configure(parameters);
+		} catch (RuntimeException ex) {
+			throw new ProgramParametrizationException(ex.getMessage());
+		}
+
 		// integration tests run with with object reuse both disabled and enabled
-		if (parameters.has("__disable_object_reuse")) {
+		if (disableObjectReuse.getValue()) {
 			config.disableObjectReuse();
 		} else {
 			config.enableObjectReuse();
@@ -296,6 +334,55 @@ public class Runner {
 		}
 
 		algorithm.printAnalytics(System.out);
+
+		if (jobDetailsPath.getValue() != null) {
+			writeJobDetails(env, jobDetailsPath.getValue());
+		}
+	}
+
+	/**
+	 * Write the following job details as a JSON encoded file: runtime environment
+	 * job ID, runtime, parameters, and accumulators.
+	 *
+	 * @param env the execution environment
+	 * @param jobDetailsPath filesystem path to write job details
+	 * @throws IOException on error writing to jobDetailsPath
+	 */
+	private static void writeJobDetails(ExecutionEnvironment env, String jobDetailsPath) throws IOException {
+		JobExecutionResult result = env.getLastJobExecutionResult();
+
+		File jsonFile = new File(jobDetailsPath);
+
+		try (JsonGenerator json = new JsonFactory().createGenerator(jsonFile, JsonEncoding.UTF8)) {
+			json.writeStartObject();
+
+			json.writeObjectFieldStart("Apache Flink");
+			json.writeStringField("version", EnvironmentInformation.getVersion());
+			json.writeStringField("commit ID", EnvironmentInformation.getRevisionInformation().commitId);
+			json.writeStringField("commit date", EnvironmentInformation.getRevisionInformation().commitDate);
+			json.writeEndObject();
+
+			json.writeStringField("job_id", result.getJobID().toString());
+			json.writeNumberField("runtime_ms", result.getNetRuntime());
+
+			json.writeObjectFieldStart("parameters");
+			for (Map.Entry<String, String> entry : env.getConfig().getGlobalJobParameters().toMap().entrySet()) {
+				json.writeStringField(entry.getKey(), entry.getValue());
+			}
+			json.writeEndObject();
+
+			json.writeObjectFieldStart("accumulators");
+			for (Map.Entry<String, Object> entry : result.getAllAccumulatorResults().entrySet()) {
+				json.writeStringField(entry.getKey(), entry.getValue().toString());
+			}
+			json.writeEndObject();
+
+			json.writeEndObject();
+		}
+	}
+
+	public static void main(String[] args) throws Exception {
+		new Runner(args).run();
 	}
 
 	/**


[3/7] flink git commit: [FLINK-6357] [java] ParameterTool get unrequested parameters

Posted by gr...@apache.org.
[FLINK-6357] [java] ParameterTool get unrequested parameters

Adds ParameterTool#getUnrequestedParameters returning a Set<String> of
parameter arguments names not yet requested by ParameterTool#has or any
of the ParameterTool#get methods.

This closes #4169


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/be4853d7
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/be4853d7
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/be4853d7

Branch: refs/heads/master
Commit: be4853d79893da16b6cf20d5275dba42668b9103
Parents: 6a59e2f
Author: Greg Hogan <co...@greghogan.com>
Authored: Wed Jun 21 08:18:55 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jul 11 08:51:22 2017 -0400

----------------------------------------------------------------------
 .../flink/api/java/utils/ParameterTool.java     |  24 +-
 .../flink/api/java/utils/ParameterToolTest.java | 433 ++++++++++++++++++-
 2 files changed, 444 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/be4853d7/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
----------------------------------------------------------------------
diff --git a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
index 8e15441..b76bcc0 100644
--- a/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
+++ b/flink-java/src/main/java/org/apache/flink/api/java/utils/ParameterTool.java
@@ -34,9 +34,12 @@ import java.io.IOException;
 import java.io.OutputStream;
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 
 /**
  * This class provides simple utility methods for reading and parsing program arguments from different sources
@@ -208,11 +211,23 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
 
 	// ------------------ ParameterUtil  ------------------------
 	protected final Map<String, String> data;
-	protected final HashMap<String, String> defaultData;
+	protected final Map<String, String> defaultData;
+	protected final Set<String> unrequestedParameters;
 
 	private ParameterTool(Map<String, String> data) {
-		this.data = new HashMap<String, String>(data);
-		this.defaultData = new HashMap<String, String>();
+		this.data = new HashMap<>(data);
+		this.defaultData = new HashMap<>();
+		this.unrequestedParameters = new HashSet<>(data.keySet());
+	}
+
+	/**
+	 * Returns the set of parameter names which have not been requested with
+	 * {@link #has(String)} or one of the {@code get} methods. Access to the
+	 * map returned by {@link #toMap()} is not tracked.
+	 */
+	@PublicEvolving
+	public Set<String> getUnrequestedParameters() {
+		return Collections.unmodifiableSet(unrequestedParameters);
 	}
 
 	// ------------------ Get data from the util ----------------
@@ -230,6 +245,7 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
 	 */
 	public String get(String key) {
 		addToDefaults(key, null);
+		unrequestedParameters.remove(key);
 		return data.get(key);
 	}
 
@@ -266,6 +282,7 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
 	 */
 	public boolean has(String value) {
 		addToDefaults(value, null);
+		unrequestedParameters.remove(value);
 		return data.containsKey(value);
 	}
 
@@ -548,6 +565,7 @@ public class ParameterTool extends ExecutionConfig.GlobalJobParameters implement
 	public ParameterTool mergeWith(ParameterTool other) {
 		ParameterTool ret = new ParameterTool(this.data);
 		ret.data.putAll(other.data);
+		ret.unrequestedParameters.addAll(other.unrequestedParameters);
 		return ret;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/be4853d7/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
----------------------------------------------------------------------
diff --git a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
index 9b63985..1924ea3 100644
--- a/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
+++ b/flink-java/src/test/java/org/apache/flink/api/java/utils/ParameterToolTest.java
@@ -18,24 +18,33 @@
 
 package org.apache.flink.api.java.utils;
 
+import com.google.common.collect.Sets;
 import org.junit.Assert;
+import org.junit.Rule;
 import org.junit.Test;
+import org.junit.rules.ExpectedException;
 
 import java.io.File;
 import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.Collections;
 import java.util.Map;
 import java.util.Properties;
 
+/**
+ * Tests for {@link ParameterTool}.
+ */
 public class ParameterToolTest extends AbstractParameterToolTest {
 
+	@Rule
+	public final ExpectedException exception = ExpectedException.none();
+
 	// ----- Parser tests -----------------
 
 	@Test(expected = RuntimeException.class)
 	public void testIllegalArgs() {
-		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"berlin"});
-		Assert.assertEquals(0, parameter.getNumberOfParameters());
+		ParameterTool.fromArgs(new String[]{"berlin"});
 	}
 
 	@Test
@@ -78,18 +87,12 @@ public class ParameterToolTest extends AbstractParameterToolTest {
 
 	@Test(expected = IllegalArgumentException.class)
 	public void testEmptyVal() {
-		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--a", "-b", "--"});
-		Assert.assertEquals(2, parameter.getNumberOfParameters());
-		Assert.assertTrue(parameter.has("a"));
-		Assert.assertTrue(parameter.has("b"));
+		ParameterTool.fromArgs(new String[]{"--a", "-b", "--"});
 	}
 
 	@Test(expected = IllegalArgumentException.class)
 	public void testEmptyValShort() {
-		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"--a", "-b", "-"});
-		Assert.assertEquals(2, parameter.getNumberOfParameters());
-		Assert.assertTrue(parameter.has("a"));
-		Assert.assertTrue(parameter.has("b"));
+		ParameterTool.fromArgs(new String[]{"--a", "-b", "-"});
 	}
 
 	@Test
@@ -154,4 +157,414 @@ public class ParameterToolTest extends AbstractParameterToolTest {
 		ParameterTool parameter = ParameterTool.fromGenericOptionsParser(new String[]{"-D", "input=myInput", "-DexpectedCount=15"});
 		validate(parameter);
 	}
+
+	// Boolean
+
+	@Test
+	public void testUnrequestedBoolean() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-boolean", "true"});
+		Assert.assertEquals(Sets.newHashSet("boolean"), parameter.getUnrequestedParameters());
+
+		// test parameter access
+		Assert.assertTrue(parameter.getBoolean("boolean"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		// test repeated access
+		Assert.assertTrue(parameter.getBoolean("boolean"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedBooleanWithDefaultValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-boolean", "true"});
+		Assert.assertEquals(Sets.newHashSet("boolean"), parameter.getUnrequestedParameters());
+
+		// test parameter access
+		Assert.assertTrue(parameter.getBoolean("boolean", false));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		// test repeated access
+		Assert.assertTrue(parameter.getBoolean("boolean", false));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedBooleanWithMissingValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-boolean"});
+		Assert.assertEquals(Sets.newHashSet("boolean"), parameter.getUnrequestedParameters());
+
+		parameter.getBoolean("boolean");
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	// Byte
+
+	@Test
+	public void testUnrequestedByte() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-byte", "1"});
+		Assert.assertEquals(Sets.newHashSet("byte"), parameter.getUnrequestedParameters());
+
+		// test parameter access
+		Assert.assertEquals(1, parameter.getByte("byte"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		// test repeated access
+		Assert.assertEquals(1, parameter.getByte("byte"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedByteWithDefaultValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-byte", "1"});
+		Assert.assertEquals(Sets.newHashSet("byte"), parameter.getUnrequestedParameters());
+
+		// test parameter access
+		Assert.assertEquals(1, parameter.getByte("byte", (byte) 0));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		// test repeated access
+		Assert.assertEquals(1, parameter.getByte("byte", (byte) 0));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedByteWithMissingValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-byte"});
+		Assert.assertEquals(Sets.newHashSet("byte"), parameter.getUnrequestedParameters());
+
+		exception.expect(RuntimeException.class);
+		exception.expectMessage("For input string: \"__NO_VALUE_KEY\"");
+
+		parameter.getByte("byte");
+	}
+
+	// Short
+
+	@Test
+	public void testUnrequestedShort() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-short", "2"});
+		Assert.assertEquals(Sets.newHashSet("short"), parameter.getUnrequestedParameters());
+
+		// test parameter access
+		Assert.assertEquals(2, parameter.getShort("short"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		// test repeated access
+		Assert.assertEquals(2, parameter.getShort("short"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedShortWithDefaultValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-short", "2"});
+		Assert.assertEquals(Sets.newHashSet("short"), parameter.getUnrequestedParameters());
+
+		// test parameter access
+		Assert.assertEquals(2, parameter.getShort("short", (short) 0));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		// test repeated access
+		Assert.assertEquals(2, parameter.getShort("short", (short) 0));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedShortWithMissingValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-short"});
+		Assert.assertEquals(Sets.newHashSet("short"), parameter.getUnrequestedParameters());
+
+		exception.expect(RuntimeException.class);
+		exception.expectMessage("For input string: \"__NO_VALUE_KEY\"");
+
+		parameter.getShort("short");
+	}
+
+	// Int
+
+	@Test
+	public void testUnrequestedInt() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-int", "4"});
+		Assert.assertEquals(Sets.newHashSet("int"), parameter.getUnrequestedParameters());
+
+		// test parameter access
+		Assert.assertEquals(4, parameter.getInt("int"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		// test repeated access
+		Assert.assertEquals(4, parameter.getInt("int"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedIntWithDefaultValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-int", "4"});
+		Assert.assertEquals(Sets.newHashSet("int"), parameter.getUnrequestedParameters());
+
+		// test parameter access
+		Assert.assertEquals(4, parameter.getInt("int", 0));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		// test repeated access
+		Assert.assertEquals(4, parameter.getInt("int", 0));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedIntWithMissingValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-int"});
+		Assert.assertEquals(Sets.newHashSet("int"), parameter.getUnrequestedParameters());
+
+		exception.expect(RuntimeException.class);
+		exception.expectMessage("For input string: \"__NO_VALUE_KEY\"");
+
+		parameter.getInt("int");
+	}
+
+	// Long
+
+	@Test
+	public void testUnrequestedLong() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-long", "8"});
+		Assert.assertEquals(Sets.newHashSet("long"), parameter.getUnrequestedParameters());
+
+		// test parameter access
+		Assert.assertEquals(8, parameter.getLong("long"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		// test repeated access
+		Assert.assertEquals(8, parameter.getLong("long"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedLongWithDefaultValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-long", "8"});
+		Assert.assertEquals(Sets.newHashSet("long"), parameter.getUnrequestedParameters());
+
+		// test parameter access
+		Assert.assertEquals(8, parameter.getLong("long", 0));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		// test repeated access
+		Assert.assertEquals(8, parameter.getLong("long", 0));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedLongWithMissingValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-long"});
+		Assert.assertEquals(Sets.newHashSet("long"), parameter.getUnrequestedParameters());
+
+		exception.expect(RuntimeException.class);
+		exception.expectMessage("For input string: \"__NO_VALUE_KEY\"");
+
+		parameter.getLong("long");
+	}
+
+	// Float
+
+	@Test
+	public void testUnrequestedFloat() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-float", "4"});
+		Assert.assertEquals(Sets.newHashSet("float"), parameter.getUnrequestedParameters());
+
+		// test parameter access
+		Assert.assertEquals(4.0, parameter.getFloat("float"), 0.00001);
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		// test repeated access
+		Assert.assertEquals(4.0, parameter.getFloat("float"), 0.00001);
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedFloatWithDefaultValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-float", "4"});
+		Assert.assertEquals(Sets.newHashSet("float"), parameter.getUnrequestedParameters());
+
+		// test parameter access
+		Assert.assertEquals(4.0, parameter.getFloat("float", 0.0f), 0.00001);
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		// test repeated access
+		Assert.assertEquals(4.0, parameter.getFloat("float", 0.0f), 0.00001);
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedFloatWithMissingValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-float"});
+		Assert.assertEquals(Sets.newHashSet("float"), parameter.getUnrequestedParameters());
+
+		exception.expect(RuntimeException.class);
+		exception.expectMessage("For input string: \"__NO_VALUE_KEY\"");
+
+		parameter.getFloat("float");
+	}
+
+	// Double
+
+	@Test
+	public void testUnrequestedDouble() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-double", "8"});
+		Assert.assertEquals(Sets.newHashSet("double"), parameter.getUnrequestedParameters());
+
+		// test parameter access
+		Assert.assertEquals(8.0, parameter.getDouble("double"), 0.00001);
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		// test repeated access
+		Assert.assertEquals(8.0, parameter.getDouble("double"), 0.00001);
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedDoubleWithDefaultValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-double", "8"});
+		Assert.assertEquals(Sets.newHashSet("double"), parameter.getUnrequestedParameters());
+
+		// test parameter access
+		Assert.assertEquals(8.0, parameter.getDouble("double", 0.0), 0.00001);
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		// test repeated access
+		Assert.assertEquals(8.0, parameter.getDouble("double", 0.0), 0.00001);
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedDoubleWithMissingValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-double"});
+		Assert.assertEquals(Sets.newHashSet("double"), parameter.getUnrequestedParameters());
+
+		exception.expect(RuntimeException.class);
+		exception.expectMessage("For input string: \"__NO_VALUE_KEY\"");
+
+		parameter.getDouble("double");
+	}
+
+	// String
+
+	@Test
+	public void testUnrequestedString() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-string", "∞"});
+		Assert.assertEquals(Sets.newHashSet("string"), parameter.getUnrequestedParameters());
+
+		// test parameter access
+		Assert.assertEquals("∞", parameter.get("string"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		// test repeated access
+		Assert.assertEquals("∞", parameter.get("string"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedStringWithDefaultValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-string", "∞"});
+		Assert.assertEquals(Sets.newHashSet("string"), parameter.getUnrequestedParameters());
+
+		// test parameter access
+		Assert.assertEquals("∞", parameter.get("string", "0.0"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		// test repeated access
+		Assert.assertEquals("∞", parameter.get("string", "0.0"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedStringWithMissingValue() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-string"});
+		Assert.assertEquals(Sets.newHashSet("string"), parameter.getUnrequestedParameters());
+
+		parameter.get("string");
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	// Additional methods
+
+	@Test
+	public void testUnrequestedHas() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-boolean"});
+		Assert.assertEquals(Sets.newHashSet("boolean"), parameter.getUnrequestedParameters());
+
+		// test parameter access
+		Assert.assertTrue(parameter.has("boolean"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		// test repeated access
+		Assert.assertTrue(parameter.has("boolean"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedRequired() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-required", "∞"});
+		Assert.assertEquals(Sets.newHashSet("required"), parameter.getUnrequestedParameters());
+
+		// test parameter access
+		Assert.assertEquals("∞", parameter.getRequired("required"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		// test repeated access
+		Assert.assertEquals("∞", parameter.getRequired("required"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedMultiple() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{"-boolean", "true", "-byte", "1",
+			"-short", "2", "-int", "4", "-long", "8", "-float", "4.0", "-double", "8.0", "-string", "∞"});
+		Assert.assertEquals(Sets.newHashSet("boolean", "byte", "short", "int", "long", "float", "double", "string"),
+			parameter.getUnrequestedParameters());
+
+		Assert.assertTrue(parameter.getBoolean("boolean"));
+		Assert.assertEquals(Sets.newHashSet("byte", "short", "int", "long", "float", "double", "string"),
+			parameter.getUnrequestedParameters());
+
+		Assert.assertEquals(1, parameter.getByte("byte"));
+		Assert.assertEquals(Sets.newHashSet("short", "int", "long", "float", "double", "string"),
+			parameter.getUnrequestedParameters());
+
+		Assert.assertEquals(2, parameter.getShort("short"));
+		Assert.assertEquals(Sets.newHashSet("int", "long", "float", "double", "string"),
+			parameter.getUnrequestedParameters());
+
+		Assert.assertEquals(4, parameter.getInt("int"));
+		Assert.assertEquals(Sets.newHashSet("long", "float", "double", "string"),
+			parameter.getUnrequestedParameters());
+
+		Assert.assertEquals(8, parameter.getLong("long"));
+		Assert.assertEquals(Sets.newHashSet("float", "double", "string"),
+			parameter.getUnrequestedParameters());
+
+		Assert.assertEquals(4.0, parameter.getFloat("float"), 0.00001);
+		Assert.assertEquals(Sets.newHashSet("double", "string"),
+			parameter.getUnrequestedParameters());
+
+		Assert.assertEquals(8.0, parameter.getDouble("double"), 0.00001);
+		Assert.assertEquals(Sets.newHashSet("string"),
+			parameter.getUnrequestedParameters());
+
+		Assert.assertEquals("∞", parameter.get("string"));
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
+
+	@Test
+	public void testUnrequestedUnknown() {
+		ParameterTool parameter = ParameterTool.fromArgs(new String[]{});
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+
+		Assert.assertTrue(parameter.getBoolean("boolean", true));
+		Assert.assertEquals(0, parameter.getByte("byte", (byte) 0));
+		Assert.assertEquals(0, parameter.getShort("short", (short) 0));
+		Assert.assertEquals(0, parameter.getInt("int", 0));
+		Assert.assertEquals(0, parameter.getLong("long", 0));
+		Assert.assertEquals(0, parameter.getFloat("float", 0), 0.00001);
+		Assert.assertEquals(0, parameter.getDouble("double", 0), 0.00001);
+		Assert.assertEquals("0", parameter.get("string", "0"));
+
+		Assert.assertEquals(Collections.emptySet(), parameter.getUnrequestedParameters());
+	}
 }


[4/7] flink git commit: [FLINK-6407] [build] Upgrade AVRO to 1.8.2

Posted by gr...@apache.org.
[FLINK-6407] [build] Upgrade AVRO to 1.8.2

This closes #4205


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/6a59e2fe
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/6a59e2fe
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/6a59e2fe

Branch: refs/heads/master
Commit: 6a59e2fe51d0c89fe0c152d4ad9b608c3ed9e546
Parents: b91df16
Author: zhangminglei <zm...@163.com>
Authored: Wed Jun 28 11:41:52 2017 +0800
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jul 11 08:51:22 2017 -0400

----------------------------------------------------------------------
 flink-connectors/flink-avro/pom.xml | 2 +-
 pom.xml                             | 4 ++--
 2 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/6a59e2fe/flink-connectors/flink-avro/pom.xml
----------------------------------------------------------------------
diff --git a/flink-connectors/flink-avro/pom.xml b/flink-connectors/flink-avro/pom.xml
index d057177..0ba9e8b 100644
--- a/flink-connectors/flink-avro/pom.xml
+++ b/flink-connectors/flink-avro/pom.xml
@@ -140,7 +140,7 @@ under the License.
 			<plugin>
 				<groupId>org.apache.avro</groupId>
 				<artifactId>avro-maven-plugin</artifactId>
-				<version>1.7.7</version>
+				<version>1.8.2</version>
 				<executions>
 					<execution>
 						<phase>generate-sources</phase>

http://git-wip-us.apache.org/repos/asf/flink/blob/6a59e2fe/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 92a827a..091769f 100644
--- a/pom.xml
+++ b/pom.xml
@@ -258,13 +258,13 @@ under the License.
 			<dependency>
 				<groupId>org.apache.avro</groupId>
 				<artifactId>avro</artifactId>
-				<version>1.7.7</version>
+				<version>1.8.2</version>
 			</dependency>
 			
 			<dependency>
 				<groupId>org.apache.avro</groupId>
 				<artifactId>avro-ipc</artifactId>
-				<version>1.7.7</version>
+				<version>1.8.2</version>
 			</dependency>
 
 			<!-- Make sure we use a consistent commons-cli version throughout the project -->


[6/7] flink git commit: [FLINK-7019] [gelly] Rework parallelism in Gelly algorithms and examples

Posted by gr...@apache.org.
http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
index e444b5c..a9f1752 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateEdgeValues.java
@@ -25,7 +25,6 @@ import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.util.Preconditions;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.asm.translate.Translate.translateEdgeValues;
 
 /**
@@ -42,9 +41,6 @@ extends GraphAlgorithmWrappingGraph<K, VV, OLD, K, VV, NEW> {
 	// Required configuration
 	private TranslateFunction<OLD, NEW> translator;
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * Translate {@link Edge} values using the given {@link TranslateFunction}.
 	 *
@@ -56,43 +52,15 @@ extends GraphAlgorithmWrappingGraph<K, VV, OLD, K, VV, NEW> {
 		this.translator = translator;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public TranslateEdgeValues<K, VV, OLD, NEW> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!TranslateEdgeValues.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		TranslateEdgeValues rhs = (TranslateEdgeValues) other;
 
-		// verify that configurations can be merged
-
-		if (translator != rhs.translator) {
-			return false;
-		}
-
-		// merge configurations
-
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
+		return translator == rhs.translator;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
index 3eb5113..5568775 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateGraphIds.java
@@ -26,7 +26,6 @@ import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.util.Preconditions;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.asm.translate.Translate.translateEdgeIds;
 import static org.apache.flink.graph.asm.translate.Translate.translateVertexIds;
 
@@ -44,9 +43,6 @@ extends GraphAlgorithmWrappingGraph<OLD, VV, EV, NEW, VV, EV> {
 	// Required configuration
 	private TranslateFunction<OLD, NEW> translator;
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * Translate {@link Vertex} and {@link Edge} IDs of a {@link Graph} using the given {@link TranslateFunction}.
 	 *
@@ -58,43 +54,13 @@ extends GraphAlgorithmWrappingGraph<OLD, VV, EV, NEW, VV, EV> {
 		this.translator = translator;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public TranslateGraphIds<OLD, NEW, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!TranslateGraphIds.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
 		TranslateGraphIds rhs = (TranslateGraphIds) other;
 
-		// verify that configurations can be merged
-
-		if (translator != rhs.translator) {
-			return false;
-		}
-
-		// merge configurations
-
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
+		return translator == rhs.translator;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
index 38f415c..c9c94d7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/translate/TranslateVertexValues.java
@@ -25,7 +25,6 @@ import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.util.Preconditions;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.asm.translate.Translate.translateVertexValues;
 
 /**
@@ -42,9 +41,6 @@ extends GraphAlgorithmWrappingGraph<K, OLD, EV, K, NEW, EV> {
 	// Required configuration
 	private TranslateFunction<OLD, NEW> translator;
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * Translate {@link Vertex} values using the given {@link TranslateFunction}.
 	 *
@@ -56,43 +52,13 @@ extends GraphAlgorithmWrappingGraph<K, OLD, EV, K, NEW, EV> {
 		this.translator = translator;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public TranslateVertexValues<K, OLD, NEW, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!TranslateVertexValues.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
 		TranslateVertexValues rhs = (TranslateVertexValues) other;
 
-		// verify that configurations can be merged
-
-		if (translator != rhs.translator) {
-			return false;
-		}
-
-		// merge configurations
-
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
+		return translator == rhs.translator;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorBase.java
index 1c4d097..a1906e1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorBase.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/generator/GraphGeneratorBase.java
@@ -18,6 +18,8 @@
 
 package org.apache.flink.graph.generator;
 
+import org.apache.flink.util.Preconditions;
+
 import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
@@ -35,6 +37,9 @@ implements GraphGenerator<K, VV, EV> {
 
 	@Override
 	public GraphGenerator<K, VV, EV> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
 		this.parallelism = parallelism;
 
 		return this;

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
index 7357fbc..9e26dfe 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/TriangleListingBase.java
@@ -22,9 +22,6 @@ package org.apache.flink.graph.library.clustering;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Common configuration for directed and undirected Triangle Listing algorithms.
@@ -40,8 +37,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, R> {
 	// Optional configuration
 	protected OptionalBoolean sortTriangleVertices = new OptionalBoolean(false, true);
 
-	protected int littleParallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * Normalize the triangle listing such that for each result (K0, K1, K2)
 	 * the vertex IDs are sorted K0 < K1 < K2.
@@ -55,37 +50,12 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, R> {
 		return this;
 	}
 
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public TriangleListingBase<K, VV, EV, R> setLittleParallelism(int littleParallelism) {
-		Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
 	@Override
-	protected final boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!TriangleListingBase.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
+	protected final void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
 		TriangleListingBase rhs = (TriangleListingBase) other;
 
-		// merge configurations
-
 		sortTriangleVertices.mergeWith(rhs.sortTriangleVertices);
-		littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
-			((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
-
-		return true;
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
index bfeb3d5..2ff5dc2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/AverageClusteringCoefficient.java
@@ -33,8 +33,6 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 
 import java.io.IOException;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * The average clustering coefficient measures the mean connectedness of a
  * graph. Scores range from 0.0 (no triangles) to 1.0 (complete graph).
@@ -52,21 +50,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private AverageClusteringCoefficientHelper<K> averageClusteringCoefficientHelper;
 
-	// Optional configuration
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public AverageClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
 	/*
 	 * Implementation notes:
 	 *
@@ -81,7 +64,7 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 		DataSet<LocalClusteringCoefficient.Result<K>> localClusteringCoefficient = input
 			.run(new LocalClusteringCoefficient<K, VV, EV>()
-				.setLittleParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		averageClusteringCoefficientHelper = new AverageClusteringCoefficientHelper<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
index 03f06b1..a6b0baa 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/GlobalClusteringCoefficient.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.library.clustering.directed;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.GraphAnalyticBase;
 import org.apache.flink.graph.asm.dataset.Count;
 import org.apache.flink.graph.asm.result.PrintableResult;
@@ -30,8 +31,6 @@ import org.apache.flink.types.CopyableValue;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * The global clustering coefficient measures the connectedness of a graph.
  * Scores range from 0.0 (no triangles) to 1.0 (complete graph).
@@ -45,22 +44,7 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private Count<TriangleListing.Result<K>> triangleCount;
 
-	private VertexMetrics<K, VV, EV> vertexMetrics;
-
-	// Optional configuration
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public GlobalClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
+	private GraphAnalytic<K, VV, EV, VertexMetrics.Result> vertexMetrics;
 
 	/*
 	 * Implementation notes:
@@ -79,12 +63,12 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		DataSet<TriangleListing.Result<K>> triangles = input
 			.run(new TriangleListing<K, VV, EV>()
 				.setSortTriangleVertices(false)
-				.setLittleParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		triangleCount.run(triangles);
 
 		vertexMetrics = new VertexMetrics<K, VV, EV>()
-			.setParallelism(littleParallelism);
+			.setParallelism(parallelism);
 
 		input.run(vertexMetrics);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
index 03c8808..981110f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/LocalClusteringCoefficient.java
@@ -41,9 +41,6 @@ import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * The local clustering coefficient measures the connectedness of each vertex's
@@ -67,8 +64,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	// Optional configuration
 	private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(true, true);
 
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * By default the vertex set is checked for zero degree vertices. When this
 	 * flag is disabled only clustering coefficient scores for vertices with
@@ -84,45 +79,24 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		return this;
 	}
 
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public LocalClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other;
 
-		// verify that configurations can be merged
+		return !includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices);
+	}
 
-		if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
-			return false;
-		}
+	@Override
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
-		// merge configurations
+		LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other;
 
 		includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
-
-		littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
-			((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
-
-		return true;
 	}
 
 	/*
@@ -141,7 +115,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		// u, v, w, bitmask
 		DataSet<TriangleListing.Result<K>> triangles = input
 			.run(new TriangleListing<K, VV, EV>()
-				.setLittleParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		// u, edge count
 		DataSet<Tuple2<K, LongValue>> triangleVertices = triangles
@@ -159,7 +133,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		DataSet<Vertex<K, Degrees>> vertexDegree = input
 			.run(new VertexDegrees<K, VV, EV>()
 				.setIncludeZeroDegreeVertices(includeZeroDegreeVertices.get())
-				.setParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		// u, deg(u), triangle count
 		return vertexDegree
@@ -167,7 +141,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			.where(0)
 			.equalTo(0)
 			.with(new JoinVertexDegreeWithTriangleCount<K>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Clustering coefficient");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
index 6ec9f0f..93eadc5 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriadicCensus.java
@@ -37,8 +37,6 @@ import java.io.IOException;
 import java.math.BigInteger;
 import java.text.NumberFormat;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * A triad is formed by three connected or unconnected vertices in a graph.
  * The triadic census counts the occurrences of each type of triad.
@@ -56,21 +54,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private VertexDegreesHelper<K> vertexDegreesHelper;
 
-	// Optional configuration
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public TriadicCensus<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
 	@Override
 	public TriadicCensus<K, VV, EV> run(Graph<K, VV, EV> input)
 			throws Exception {
@@ -80,7 +63,7 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 		input
 			.run(new TriangleListing<K, VV, EV>()
-				.setLittleParallelism(littleParallelism))
+				.setParallelism(parallelism))
 			.output(triangleListingHelper)
 				.name("Triangle counts");
 
@@ -88,7 +71,7 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 		input
 			.run(new VertexDegrees<K, VV, EV>()
-				.setParallelism(littleParallelism))
+				.setParallelism(parallelism))
 			.output(vertexDegreesHelper)
 				.name("Edge and triplet counts");
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
index 23e88d1..beddd24 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/directed/TriangleListing.java
@@ -84,26 +84,26 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 		DataSet<Tuple3<K, K, ByteValue>> filteredByID = input
 			.getEdges()
 			.map(new OrderByID<K, EV>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Order by ID")
 			.groupBy(0, 1)
 			.reduceGroup(new ReduceBitmask<K>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Flatten by ID");
 
 		// u, v, (deg(u), deg(v))
 		DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> pairDegrees = input
 			.run(new EdgeDegreesPair<K, VV, EV>()
-				.setParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		// u, v, bitmask where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
 		DataSet<Tuple3<K, K, ByteValue>> filteredByDegree = pairDegrees
 			.map(new OrderByDegree<K, EV>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Order by degree")
 			.groupBy(0, 1)
 			.reduceGroup(new ReduceBitmask<K>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Flatten by degree");
 
 		// u, v, w, bitmask where (u, v) and (u, w) are edges in graph

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
index c2c2ad2..c0ddb05 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/AverageClusteringCoefficient.java
@@ -33,8 +33,6 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 
 import java.io.IOException;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * The average clustering coefficient measures the mean connectedness of a
  * graph. Scores range from 0.0 (no triangles) to 1.0 (complete graph).
@@ -52,21 +50,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private AverageClusteringCoefficientHelper<K> averageClusteringCoefficientHelper;
 
-	// Optional configuration
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public AverageClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
 	/*
 	 * Implementation notes:
 	 *
@@ -81,7 +64,7 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 		DataSet<LocalClusteringCoefficient.Result<K>> localClusteringCoefficient = input
 			.run(new LocalClusteringCoefficient<K, VV, EV>()
-				.setLittleParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		averageClusteringCoefficientHelper = new AverageClusteringCoefficientHelper<>();
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
index 5377c1f..ef212ae 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/GlobalClusteringCoefficient.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.library.clustering.undirected;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.GraphAnalyticBase;
 import org.apache.flink.graph.asm.dataset.Count;
 import org.apache.flink.graph.asm.result.PrintableResult;
@@ -30,8 +31,6 @@ import org.apache.flink.types.CopyableValue;
 import org.apache.commons.lang3.builder.EqualsBuilder;
 import org.apache.commons.lang3.builder.HashCodeBuilder;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * The global clustering coefficient measures the connectedness of a graph.
  * Scores range from 0.0 (no triangles) to 1.0 (complete graph).
@@ -45,22 +44,7 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private Count<TriangleListing.Result<K>> triangleCount;
 
-	private VertexMetrics<K, VV, EV> vertexMetrics;
-
-	// Optional configuration
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public GlobalClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
+	private GraphAnalytic<K, VV, EV, VertexMetrics.Result> vertexMetrics;
 
 	/*
 	 * Implementation notes:
@@ -79,12 +63,12 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		DataSet<TriangleListing.Result<K>> triangles = input
 			.run(new TriangleListing<K, VV, EV>()
 				.setSortTriangleVertices(false)
-				.setLittleParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		triangleCount.run(triangles);
 
 		vertexMetrics = new VertexMetrics<K, VV, EV>()
-			.setParallelism(littleParallelism);
+			.setParallelism(parallelism);
 
 		input.run(vertexMetrics);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
index 3cb4b87..0beb989 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/LocalClusteringCoefficient.java
@@ -40,9 +40,6 @@ import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.CopyableValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * The local clustering coefficient measures the connectedness of each vertex's
@@ -66,8 +63,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 	// Optional configuration
 	private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(true, true);
 
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * By default the vertex set is checked for zero degree vertices. When this
 	 * flag is disabled only clustering coefficient scores for vertices with
@@ -83,44 +78,24 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		return this;
 	}
 
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public LocalClusteringCoefficient<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!LocalClusteringCoefficient.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other;
 
-		// verify that configurations can be merged
+		return !includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices);
+	}
 
-		if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
-			return false;
-		}
+	@Override
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
-		// merge configurations
+		LocalClusteringCoefficient rhs = (LocalClusteringCoefficient) other;
 
 		includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
-		littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
-			((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
-
-		return true;
 	}
 
 	/*
@@ -139,7 +114,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		// u, v, w
 		DataSet<TriangleListing.Result<K>> triangles = input
 			.run(new TriangleListing<K, VV, EV>()
-				.setLittleParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		// u, 1
 		DataSet<Tuple2<K, LongValue>> triangleVertices = triangles
@@ -157,7 +132,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		DataSet<Vertex<K, LongValue>> vertexDegree = input
 			.run(new VertexDegree<K, VV, EV>()
 				.setIncludeZeroDegreeVertices(includeZeroDegreeVertices.get())
-				.setParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		// u, deg(u), triangle count
 		return vertexDegree
@@ -165,7 +140,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			.where(0)
 			.equalTo(0)
 			.with(new JoinVertexDegreeWithTriangleCount<K>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Clustering coefficient");
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
index c5a323d..f440098 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriadicCensus.java
@@ -20,6 +20,7 @@ package org.apache.flink.graph.library.clustering.undirected;
 
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
+import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.GraphAnalyticBase;
 import org.apache.flink.graph.asm.dataset.Count;
 import org.apache.flink.graph.asm.result.PrintableResult;
@@ -34,8 +35,6 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import java.math.BigInteger;
 import java.text.NumberFormat;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * A triad is formed by three connected or unconnected vertices in a graph.
  * The triadic census counts the occurrences of each type of triad.
@@ -54,22 +53,7 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private Count<TriangleListing.Result<K>> triangleCount;
 
-	private VertexMetrics<K, VV, EV> vertexMetrics;
-
-	// Optional configuration
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public TriadicCensus<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
+	private GraphAnalytic<K, VV, EV, VertexMetrics.Result> vertexMetrics;
 
 	@Override
 	public TriadicCensus<K, VV, EV> run(Graph<K, VV, EV> input)
@@ -81,12 +65,12 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		DataSet<TriangleListing.Result<K>> triangles = input
 			.run(new TriangleListing<K, VV, EV>()
 				.setSortTriangleVertices(false)
-				.setLittleParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		triangleCount.run(triangles);
 
 		vertexMetrics = new VertexMetrics<K, VV, EV>()
-			.setParallelism(littleParallelism);
+			.setParallelism(parallelism);
 
 		input.run(vertexMetrics);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
index 9f93b4b..c714bed 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/clustering/undirected/TriangleListing.java
@@ -84,18 +84,18 @@ extends TriangleListingBase<K, VV, EV, Result<K>> {
 		DataSet<Tuple2<K, K>> filteredByID = input
 			.getEdges()
 			.flatMap(new FilterByID<K, EV>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Filter by ID");
 
 		// u, v, (edge value, deg(u), deg(v))
 		DataSet<Edge<K, Tuple3<EV, LongValue, LongValue>>> pairDegree = input
 			.run(new EdgeDegreePair<K, VV, EV>()
-				.setParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		// u, v where deg(u) < deg(v) or (deg(u) == deg(v) and u < v)
 		DataSet<Tuple2<K, K>> filteredByDegree = pairDegree
 			.flatMap(new FilterByDegree<K, EV>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Filter by degree");
 
 		// u, v, w where (u, v) and (u, w) are edges in graph, v < w

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
index 8bc30d3..6b41ee4 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/HITS.java
@@ -49,8 +49,6 @@ import org.apache.flink.util.Preconditions;
 
 import java.util.Collection;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Hyperlink-Induced Topic Search computes two interdependent scores for every
  * vertex in a directed graph. A good "hub" links to good "authorities" and
@@ -79,9 +77,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 	private double convergenceThreshold;
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * Hyperlink-Induced Topic Search with a fixed number of iterations.
 	 *
@@ -119,36 +114,14 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		this.convergenceThreshold = convergenceThreshold;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public HITS<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!HITS.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
 		HITS rhs = (HITS) other;
 
-		// merge configurations
-
 		maxIterations = Math.max(maxIterations, rhs.maxIterations);
 		convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	@Override
@@ -172,7 +145,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				.name("Sum");
 
 		IterativeDataSet<Tuple3<K, DoubleValue, DoubleValue>> iterative = initialScores
-			.iterate(maxIterations);
+			.iterate(maxIterations)
+			.setParallelism(parallelism);
 
 		// ID, hubbiness
 		DataSet<Tuple2<K, DoubleValue>> hubbiness = iterative

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
index c82f68c..af56e50 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/linkanalysis/PageRank.java
@@ -56,8 +56,6 @@ import org.apache.flink.util.Preconditions;
 import java.util.Collection;
 import java.util.Iterator;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * PageRank computes a per-vertex score which is the sum of PageRank scores
  * transmitted over in-edges. Each vertex's score is divided evenly among
@@ -86,9 +84,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 	private double convergenceThreshold;
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * PageRank with a fixed number of iterations.
 	 *
@@ -131,36 +126,14 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		this.convergenceThreshold = convergenceThreshold;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public PageRank<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!PageRank.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
 		PageRank rhs = (PageRank) other;
 
-		// merge configurations
-
 		maxIterations = Math.max(maxIterations, rhs.maxIterations);
 		convergenceThreshold = Math.min(convergenceThreshold, rhs.convergenceThreshold);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	@Override
@@ -197,7 +170,8 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 				.name("Initialize scores");
 
 		IterativeDataSet<Tuple2<K, DoubleValue>> iterative = initialScores
-			.iterate(maxIterations);
+			.iterate(maxIterations)
+			.setParallelism(parallelism);
 
 		// s, projected pagerank(s)
 		DataSet<Tuple2<K, DoubleValue>> vertexScores = iterative

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
index 30d3563..c88e401 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/EdgeMetrics.java
@@ -46,8 +46,6 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import java.io.IOException;
 import java.text.NumberFormat;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Compute the following edge metrics in a directed graph.
  *  - number of triangle triplets
@@ -72,20 +70,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private EdgeMetricsHelper<K> edgeMetricsHelper;
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public EdgeMetrics<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	/*
 	 * Implementation notes:
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
index 3931f65..97ee6fa 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/directed/VertexMetrics.java
@@ -36,8 +36,6 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import java.io.IOException;
 import java.text.NumberFormat;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Compute the following vertex metrics in a directed graph.
  *  - number of vertices
@@ -79,8 +77,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 	// Optional configuration
 	private boolean includeZeroDegreeVertices = false;
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * By default only the edge set is processed for the computation of degree.
 	 * When this flag is set an additional join is performed against the vertex
@@ -96,18 +92,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public VertexMetrics<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
 	public VertexMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
 			throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
index d1c7fb7..4c0d654 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/EdgeMetrics.java
@@ -41,8 +41,6 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import java.io.IOException;
 import java.text.NumberFormat;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Compute the following edge metrics in an undirected graph.
  *  - number of triangle triplets
@@ -70,8 +68,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 	// Optional configuration
 	private boolean reduceOnTargetId = false;
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * The degree can be counted from either the edge source or target IDs.
 	 * By default the source IDs are counted. Reducing on target IDs may
@@ -87,18 +83,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public EdgeMetrics<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	/*
 	 * Implementation notes:
 	 *

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
index 000f4e0..1116149 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/metric/undirected/VertexMetrics.java
@@ -36,8 +36,6 @@ import org.apache.commons.lang3.builder.HashCodeBuilder;
 import java.io.IOException;
 import java.text.NumberFormat;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Compute the following vertex metrics in an undirected graph.
  *  - number of vertices
@@ -71,8 +69,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 
 	private boolean reduceOnTargetId = false;
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * By default only the edge set is processed for the computation of degree.
 	 * When this flag is set an additional join is performed against the vertex
@@ -103,18 +99,6 @@ extends GraphAnalyticBase<K, VV, EV, Result> {
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public VertexMetrics<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
 	public VertexMetrics<K, VV, EV> run(Graph<K, VV, EV> input)
 			throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
index 5f1ac3c..78dff0b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/AdamicAdar.java
@@ -51,8 +51,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * http://social.cs.uiuc.edu/class/cs591kgk/friendsadamic.pdf
  *
@@ -84,8 +82,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 	private float minimumRatio = 0.0f;
 
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * Filter out Adamic-Adar scores less than the given minimum.
 	 *
@@ -114,44 +110,15 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		return this;
 	}
 
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public AdamicAdar<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!AdamicAdar.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		AdamicAdar rhs = (AdamicAdar) other;
 
-		// verify that configurations can be merged
-
-		if (minimumRatio != rhs.minimumRatio ||
-			minimumScore != rhs.minimumScore) {
-			return false;
-		}
-
-		// merge configurations
-
-		littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
-			((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
-
-		return true;
+		return minimumRatio == rhs.minimumRatio && minimumScore == rhs.minimumScore;
 	}
 
 	/*
@@ -168,9 +135,9 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		// s, d(s), 1/log(d(s))
 		DataSet<Tuple3<K, LongValue, FloatValue>> inverseLogDegree = input
 			.run(new VertexDegree<K, VV, EV>()
-				.setParallelism(littleParallelism))
+				.setParallelism(parallelism))
 			.map(new VertexInverseLogDegree<K>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Vertex score");
 
 		// s, t, 1/log(d(s))
@@ -181,7 +148,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			.equalTo(0)
 			.projectFirst(0, 1)
 			.<Tuple3<K, K, FloatValue>>projectSecond(2)
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Edge score");
 
 		// group span, s, t, 1/log(d(s))
@@ -189,16 +156,16 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			.groupBy(0)
 			.sortGroup(1, Order.ASCENDING)
 			.reduceGroup(new GenerateGroupSpans<K>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Generate group spans");
 
 		// group, s, t, 1/log(d(s))
 		DataSet<Tuple4<IntValue, K, K, FloatValue>> groups = groupSpans
 			.rebalance()
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Rebalance")
 			.flatMap(new GenerateGroups<K>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Generate groups");
 
 		// t, u, 1/log(d(s)) where (s, t) and (s, u) are edges in graph
@@ -218,7 +185,7 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 			// total score, number of pairs of neighbors
 			DataSet<Tuple2<FloatValue, LongValue>> sumOfScoresAndNumberOfNeighborPairs = inverseLogDegree
 				.map(new ComputeScoreFromVertex<K>())
-					.setParallelism(littleParallelism)
+					.setParallelism(parallelism)
 					.name("Average score")
 				.sum(0)
 				.andSum(1);

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
index 64f325c..8e820ac 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/library/similarity/JaccardIndex.java
@@ -45,8 +45,6 @@ import org.apache.flink.util.Preconditions;
 import java.util.ArrayList;
 import java.util.List;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * The Jaccard Index measures the similarity between vertex neighborhoods and
  * is computed as the number of shared neighbors divided by the number of
@@ -84,8 +82,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 
 	private boolean mirrorResults;
 
-	private int littleParallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * Override the default group size for the quadratic expansion of neighbor
 	 * pairs. Small groups generate more data whereas large groups distribute
@@ -158,49 +154,29 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		return this;
 	}
 
-	/**
-	 * Override the parallelism of operators processing small amounts of data.
-	 *
-	 * @param littleParallelism operator parallelism
-	 * @return this
-	 */
-	public JaccardIndex<K, VV, EV> setLittleParallelism(int littleParallelism) {
-		Preconditions.checkArgument(littleParallelism > 0 || littleParallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.littleParallelism = littleParallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!JaccardIndex.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		JaccardIndex rhs = (JaccardIndex) other;
 
-		// verify that configurations can be merged
+		return unboundedScores == rhs.unboundedScores &&
+			minimumScoreNumerator == rhs.minimumScoreNumerator &&
+			minimumScoreDenominator == rhs.minimumScoreDenominator &&
+			maximumScoreNumerator == rhs.maximumScoreNumerator &&
+			maximumScoreDenominator == rhs.maximumScoreDenominator &&
+			mirrorResults == rhs.mirrorResults;
+	}
 
-		if (unboundedScores != rhs.unboundedScores ||
-			minimumScoreNumerator != rhs.minimumScoreNumerator ||
-			minimumScoreDenominator != rhs.minimumScoreDenominator ||
-			maximumScoreNumerator != rhs.maximumScoreNumerator ||
-			maximumScoreDenominator != rhs.maximumScoreDenominator ||
-			mirrorResults != rhs.mirrorResults) {
-			return false;
-		}
+	@Override
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
-		// merge configurations
+		JaccardIndex rhs = (JaccardIndex) other;
 
 		groupSize = Math.max(groupSize, rhs.groupSize);
-		littleParallelism = (littleParallelism == PARALLELISM_DEFAULT) ? rhs.littleParallelism :
-			((rhs.littleParallelism == PARALLELISM_DEFAULT) ? littleParallelism : Math.min(littleParallelism, rhs.littleParallelism));
-
-		return true;
 	}
 
 	/*
@@ -217,23 +193,23 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Result<K>> {
 		// s, t, d(t)
 		DataSet<Edge<K, Tuple2<EV, LongValue>>> neighborDegree = input
 			.run(new EdgeTargetDegree<K, VV, EV>()
-				.setParallelism(littleParallelism));
+				.setParallelism(parallelism));
 
 		// group span, s, t, d(t)
 		DataSet<Tuple4<IntValue, K, K, IntValue>> groupSpans = neighborDegree
 			.groupBy(0)
 			.sortGroup(1, Order.ASCENDING)
 			.reduceGroup(new GenerateGroupSpans<K, EV>(groupSize))
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Generate group spans");
 
 		// group, s, t, d(t)
 		DataSet<Tuple4<IntValue, K, K, IntValue>> groups = groupSpans
 			.rebalance()
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Rebalance")
 			.flatMap(new GenerateGroups<K>())
-				.setParallelism(littleParallelism)
+				.setParallelism(parallelism)
 				.name("Generate groups");
 
 		// t, u, d(t)+d(u)

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingBase.java
index 21aa971..2f5aa83 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingBase.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingBase.java
@@ -22,6 +22,9 @@ package org.apache.flink.graph.utils.proxy;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAlgorithm;
+import org.apache.flink.util.Preconditions;
+
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * A {@link GraphAlgorithm} transforms an input {@link Graph} into an output of
@@ -39,6 +42,29 @@ import org.apache.flink.graph.GraphAlgorithm;
 public abstract class GraphAlgorithmWrappingBase<K, VV, EV, R>
 implements GraphAlgorithm<K, VV, EV, R> {
 
+	protected int parallelism = PARALLELISM_DEFAULT;
+
+	/**
+	 * Set the parallelism for this algorithm's operators. This parameter is
+	 * necessary because processing a small amount of data with high operator
+	 * parallelism is slow and wasteful with memory and buffers.
+	 *
+	 * <p>Operator parallelism should be set to this given value unless
+	 * processing asymptotically more data, in which case the default job
+	 * parallelism should be inherited.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public GraphAlgorithmWrappingBase<K, VV, EV, R> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
 	/**
 	 * Algorithms are identified by name rather than by class to allow subclassing.
 	 *
@@ -50,12 +76,34 @@ implements GraphAlgorithm<K, VV, EV, R> {
 	}
 
 	/**
-	 * An algorithm must first test whether the configurations can be merged
-	 * before merging individual fields.
+	 * First test whether the algorithm configurations can be merged before the
+	 * call to {@link #mergeConfiguration}.
 	 *
-	 * @param other the algorithm with which to compare and merge
-	 * @return true if and only if configuration has been merged and the
-	 *          algorithm's output can be reused
+	 * @param other the algorithm with which to compare configuration
+	 * @return true if and only if configuration can be merged and the
+	 *         algorithm's output can be reused
+	 *
+	 * @see #mergeConfiguration(GraphAlgorithmWrappingBase)
+	 */
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		Preconditions.checkNotNull(other);
+
+		return this.getClass().equals(other.getClass());
+	}
+
+	/**
+	 * Merge the other configuration into this algorithm's after the call to
+	 * {@link #canMergeConfigurationWith} has checked that the configurations
+	 * can be merged.
+	 *
+	 * @param other the algorithm from which to merge configuration
+	 *
+	 * @see #canMergeConfigurationWith(GraphAlgorithmWrappingBase)
 	 */
-	protected abstract boolean mergeConfiguration(GraphAlgorithmWrappingBase<K, VV, EV, R> other);
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		Preconditions.checkNotNull(other);
+
+		parallelism = (parallelism == PARALLELISM_DEFAULT) ? other.parallelism :
+			((other.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, other.parallelism));
+	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
index b329f1e..78a79ab 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingDataSet.java
@@ -101,7 +101,9 @@ extends GraphAlgorithmWrappingBase<K, VV, EV, DataSet<T>> {
 
 		if (cache.containsKey(this)) {
 			for (GraphAlgorithmWrappingDataSet<K, VV, EV, T> other : cache.get(this)) {
-				if (mergeConfiguration(other)) {
+				if (canMergeConfigurationWith(other)) {
+					mergeConfiguration(other);
+
 					// configuration has been merged so generate new output
 					DataSet<T> output = runInternal(input);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
index 09b5f5e..4f77fb7 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/utils/proxy/GraphAlgorithmWrappingGraph.java
@@ -106,7 +106,9 @@ extends GraphAlgorithmWrappingBase<IN_K, IN_VV, IN_EV, Graph<OUT_K, OUT_VV, OUT_
 
 		if (cache.containsKey(this)) {
 			for (GraphAlgorithmWrappingGraph<IN_K, IN_VV, IN_EV, OUT_K, OUT_VV, OUT_EV> other : cache.get(this)) {
-				if (mergeConfiguration(other)) {
+				if (canMergeConfigurationWith(other)) {
+					mergeConfiguration(other);
+
 					// configuration has been merged so generate new output
 					Graph<OUT_K, OUT_VV, OUT_EV> output = runInternal(input);
 


[7/7] flink git commit: [FLINK-7019] [gelly] Rework parallelism in Gelly algorithms and examples

Posted by gr...@apache.org.
[FLINK-7019] [gelly] Rework parallelism in Gelly algorithms and examples

Flink job parallelism is set with ExecutionConfig#setParallelism or with
-p on the command-line. The Gelly algorithms JaccardIndex, AdamicAdar,
TriangleListing, and ClusteringCoefficient have intermediate operators
which generate output quadratic in the size of input. These algorithms
may need to be run with a high parallelism but doing so for all
operations is wasteful. Thus was introduced "little parallelism".

This can be simplified by moving the parallelism parameter to the new
common base class with the rule-of-thumb to use the algorithm
parallelism for all normal (small output) operators. The asymptotically
large operators will default to the job parallelism, as will the default
algorithm parallelism.

This closes #4282


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/d0cc2c17
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/d0cc2c17
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/d0cc2c17

Branch: refs/heads/master
Commit: d0cc2c178714987ba23998486651791d04a5beb1
Parents: 273223f
Author: Greg Hogan <co...@greghogan.com>
Authored: Mon Jun 26 10:21:50 2017 -0400
Committer: Greg Hogan <co...@greghogan.com>
Committed: Tue Jul 11 08:51:23 2017 -0400

----------------------------------------------------------------------
 docs/dev/libs/gelly/library_methods.md          | 14 ++---
 .../apache/flink/graph/drivers/AdamicAdar.java  | 10 +---
 .../graph/drivers/ClusteringCoefficient.java    | 20 +++----
 .../apache/flink/graph/drivers/DriverBase.java  |  6 ++
 .../apache/flink/graph/drivers/EdgeList.java    |  3 +-
 .../flink/graph/drivers/GraphMetrics.java       | 12 ++--
 .../org/apache/flink/graph/drivers/HITS.java    |  5 +-
 .../flink/graph/drivers/JaccardIndex.java       |  9 +--
 .../apache/flink/graph/drivers/PageRank.java    |  7 ++-
 .../flink/graph/drivers/TriangleListing.java    | 16 ++----
 .../graph/drivers/input/CirculantGraph.java     |  6 +-
 .../graph/drivers/input/CompleteGraph.java      |  6 +-
 .../flink/graph/drivers/input/CycleGraph.java   |  6 +-
 .../flink/graph/drivers/input/EchoGraph.java    |  6 +-
 .../flink/graph/drivers/input/EmptyGraph.java   |  1 +
 .../graph/drivers/input/GeneratedGraph.java     |  6 ++
 .../flink/graph/drivers/input/GridGraph.java    |  8 +--
 .../graph/drivers/input/HypercubeGraph.java     |  6 +-
 .../flink/graph/drivers/input/PathGraph.java    |  6 +-
 .../flink/graph/drivers/input/RMatGraph.java    | 11 +---
 .../graph/drivers/input/SingletonEdgeGraph.java |  6 +-
 .../flink/graph/drivers/input/StarGraph.java    |  6 +-
 .../apache/flink/graph/GraphAnalyticBase.java   | 25 ++++++++
 .../annotate/directed/EdgeDegreesPair.java      | 35 ------------
 .../annotate/directed/EdgeSourceDegrees.java    | 35 ------------
 .../annotate/directed/EdgeTargetDegrees.java    | 35 ------------
 .../degree/annotate/directed/VertexDegrees.java | 38 +++----------
 .../annotate/directed/VertexInDegree.java       | 41 +++----------
 .../annotate/directed/VertexOutDegree.java      | 41 +++----------
 .../annotate/undirected/EdgeDegreePair.java     | 32 +----------
 .../annotate/undirected/EdgeSourceDegree.java   | 32 +----------
 .../annotate/undirected/EdgeTargetDegree.java   | 32 +----------
 .../annotate/undirected/VertexDegree.java       | 41 +++----------
 .../degree/filter/undirected/MaximumDegree.java | 37 +++---------
 .../graph/asm/simple/directed/Simplify.java     | 38 -------------
 .../graph/asm/simple/undirected/Simplify.java   | 40 +------------
 .../asm/translate/TranslateEdgeValues.java      | 38 +------------
 .../graph/asm/translate/TranslateGraphIds.java  | 40 +------------
 .../asm/translate/TranslateVertexValues.java    | 40 +------------
 .../graph/generator/GraphGeneratorBase.java     |  5 ++
 .../library/clustering/TriangleListingBase.java | 34 +----------
 .../directed/AverageClusteringCoefficient.java  | 19 +------
 .../directed/GlobalClusteringCoefficient.java   | 24 ++------
 .../directed/LocalClusteringCoefficient.java    | 48 ++++------------
 .../clustering/directed/TriadicCensus.java      | 21 +------
 .../clustering/directed/TriangleListing.java    | 10 ++--
 .../AverageClusteringCoefficient.java           | 19 +------
 .../undirected/GlobalClusteringCoefficient.java | 24 ++------
 .../undirected/LocalClusteringCoefficient.java  | 47 ++++-----------
 .../clustering/undirected/TriadicCensus.java    | 24 ++------
 .../clustering/undirected/TriangleListing.java  |  6 +-
 .../flink/graph/library/linkanalysis/HITS.java  | 34 ++---------
 .../graph/library/linkanalysis/PageRank.java    | 34 ++---------
 .../library/metric/directed/EdgeMetrics.java    | 16 ------
 .../library/metric/directed/VertexMetrics.java  | 16 ------
 .../library/metric/undirected/EdgeMetrics.java  | 16 ------
 .../metric/undirected/VertexMetrics.java        | 16 ------
 .../graph/library/similarity/AdamicAdar.java    | 53 ++++-------------
 .../graph/library/similarity/JaccardIndex.java  | 58 ++++++-------------
 .../utils/proxy/GraphAlgorithmWrappingBase.java | 60 ++++++++++++++++++--
 .../proxy/GraphAlgorithmWrappingDataSet.java    |  4 +-
 .../proxy/GraphAlgorithmWrappingGraph.java      |  4 +-
 62 files changed, 293 insertions(+), 1095 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/docs/dev/libs/gelly/library_methods.md
----------------------------------------------------------------------
diff --git a/docs/dev/libs/gelly/library_methods.md b/docs/dev/libs/gelly/library_methods.md
index 026bea4..de53aaf 100644
--- a/docs/dev/libs/gelly/library_methods.md
+++ b/docs/dev/libs/gelly/library_methods.md
@@ -225,7 +225,7 @@ Directed and undirected variants are provided. The analytics take a simple graph
 containing the total number of vertices and average clustering coefficient of the graph. The graph ID type must be
 `Comparable` and `Copyable`.
 
-* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
+* `setParallelism`: override the parallelism of operators processing small amounts of data
 
 ### Global Clustering Coefficient
 
@@ -244,7 +244,7 @@ Directed and undirected variants are provided. The analytics take a simple graph
 containing the total number of triplets and triangles in the graph. The result class provides a method to compute the
 global clustering coefficient score. The graph ID type must be `Comparable` and `Copyable`.
 
-* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
+* `setParallelism`: override the parallelism of operators processing small amounts of data
 
 ### Local Clustering Coefficient
 
@@ -266,7 +266,7 @@ provides a method to compute the local clustering coefficient score. The graph I
 `Copyable`.
 
 * `setIncludeZeroDegreeVertices`: include results for vertices with a degree of zero
-* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
+* `setParallelism`: override the parallelism of operators processing small amounts of data
 
 ### Triadic Census
 
@@ -286,7 +286,7 @@ Directed and undirected variants are provided. The analytics take a simple graph
 `AnalyticResult` with accessor methods for querying the count of each triad type. The graph ID type must be
 `Comparable` and `Copyable`.
 
-* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
+* `setParallelism`: override the parallelism of operators processing small amounts of data
 
 ### Triangle Listing
 
@@ -306,7 +306,7 @@ Directed and undirected variants are provided. The algorithms take a simple grap
 `TertiaryResult` containing the three triangle vertices and, for the directed algorithm, a bitmask marking each of the
 six potential edges connecting the three vertices. The graph ID type must be `Comparable` and `Copyable`.
 
-* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
+* `setParallelism`: override the parallelism of operators processing small amounts of data
 * `setSortTriangleVertices`: normalize the triangle listing such that for each result (K0, K1, K2) the vertex IDs are sorted K0 < K1 < K2
 
 ## Link Analysis
@@ -424,9 +424,9 @@ See the [Jaccard Index](#jaccard-index) library method for a similar algorithm.
 The algorithm takes a simple undirected graph as input and outputs a `DataSet` of `BinaryResult` containing two vertex
 IDs and the Adamic-Adar similarity score. The graph ID type must be `Copyable`.
 
-* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
 * `setMinimumRatio`: filter out Adamic-Adar scores less than the given ratio times the average score
 * `setMinimumScore`: filter out Adamic-Adar scores less than the given minimum
+* `setParallelism`: override the parallelism of operators processing small amounts of data
 
 ### Jaccard Index
 
@@ -448,8 +448,8 @@ The algorithm takes a simple undirected graph as input and outputs a `DataSet` o
 the number of shared neighbors, and the number of distinct neighbors. The result class provides a method to compute the
 Jaccard Index score. The graph ID type must be `Copyable`.
 
-* `setLittleParallelism`: override the parallelism of operators processing small amounts of data
 * `setMaximumScore`: filter out Jaccard Index scores greater than or equal to the given maximum fraction
 * `setMinimumScore`: filter out Jaccard Index scores less than the given minimum fraction
+* `setParallelism`: override the parallelism of operators processing small amounts of data
 
 {% top %}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
index e439ccd..5dd8ea1 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/AdamicAdar.java
@@ -21,14 +21,11 @@ package org.apache.flink.graph.drivers;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.drivers.parameter.DoubleParameter;
-import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.CopyableValue;
 
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Driver for {@link org.apache.flink.graph.library.similarity.AdamicAdar}.
  */
@@ -43,9 +40,6 @@ extends DriverBase<K, VV, EV> {
 		.setDefaultValue(0.0)
 		.setMinimumValue(0.0, true);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getShortDescription() {
 		return "similarity score weighted by centerpoint degree";
@@ -64,12 +58,10 @@ extends DriverBase<K, VV, EV> {
 
 	@Override
 	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
-		int lp = littleParallelism.getValue().intValue();
-
 		return graph
 			.run(new org.apache.flink.graph.library.similarity.AdamicAdar<K, VV, EV>()
 				.setMinimumRatio(minRatio.getValue().floatValue())
 				.setMinimumScore(minScore.getValue().floatValue())
-				.setLittleParallelism(lp));
+				.setParallelism(parallelism.getValue().intValue()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
index 14e953a..e10208d 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/ClusteringCoefficient.java
@@ -23,7 +23,6 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
-import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.CopyableValue;
 
 import org.apache.commons.lang3.text.StrBuilder;
@@ -31,8 +30,6 @@ import org.apache.commons.lang3.text.WordUtils;
 
 import java.io.PrintStream;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Driver for directed and undirected clustering coefficient algorithm and analytics.
  *
@@ -53,9 +50,6 @@ extends DriverBase<K, VV, EV> {
 	private ChoiceParameter order = new ChoiceParameter(this, "order")
 		.addChoices(DIRECTED, UNDIRECTED);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	private GraphAnalytic<K, VV, EV, ? extends PrintableResult> globalClusteringCoefficient;
 
 	private GraphAnalytic<K, VV, EV, ? extends PrintableResult> averageClusteringCoefficient;
@@ -81,37 +75,37 @@ extends DriverBase<K, VV, EV> {
 
 	@Override
 	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
-		int lp = littleParallelism.getValue().intValue();
+		int parallelism = this.parallelism.getValue().intValue();
 
 		switch (order.getValue()) {
 			case DIRECTED:
 				globalClusteringCoefficient = graph
 					.run(new org.apache.flink.graph.library.clustering.directed.GlobalClusteringCoefficient<K, VV, EV>()
-						.setLittleParallelism(lp));
+						.setParallelism(parallelism));
 
 				averageClusteringCoefficient = graph
 					.run(new org.apache.flink.graph.library.clustering.directed.AverageClusteringCoefficient<K, VV, EV>()
-						.setLittleParallelism(lp));
+						.setParallelism(parallelism));
 
 				@SuppressWarnings("unchecked")
 				DataSet<PrintableResult> directedResult = (DataSet<PrintableResult>) (DataSet<?>) graph
 					.run(new org.apache.flink.graph.library.clustering.directed.LocalClusteringCoefficient<K, VV, EV>()
-						.setLittleParallelism(lp));
+						.setParallelism(parallelism));
 				return directedResult;
 
 			case UNDIRECTED:
 				globalClusteringCoefficient = graph
 					.run(new org.apache.flink.graph.library.clustering.undirected.GlobalClusteringCoefficient<K, VV, EV>()
-						.setLittleParallelism(lp));
+						.setParallelism(parallelism));
 
 				averageClusteringCoefficient = graph
 					.run(new org.apache.flink.graph.library.clustering.undirected.AverageClusteringCoefficient<K, VV, EV>()
-						.setLittleParallelism(lp));
+						.setParallelism(parallelism));
 
 				@SuppressWarnings("unchecked")
 				DataSet<PrintableResult> undirectedResult = (DataSet<PrintableResult>) (DataSet<?>) graph
 					.run(new org.apache.flink.graph.library.clustering.undirected.LocalClusteringCoefficient<K, VV, EV>()
-						.setLittleParallelism(lp));
+						.setParallelism(parallelism));
 				return undirectedResult;
 
 			default:

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/DriverBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/DriverBase.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/DriverBase.java
index 38e4ea84..5b5a684 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/DriverBase.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/DriverBase.java
@@ -18,10 +18,13 @@
 
 package org.apache.flink.graph.drivers;
 
+import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.graph.drivers.parameter.ParameterizedBase;
 
 import java.io.PrintStream;
 
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
 /**
  * Base class for example drivers.
  *
@@ -33,6 +36,9 @@ public abstract class DriverBase<K, VV, EV>
 extends ParameterizedBase
 implements Driver<K, VV, EV> {
 
+	protected LongParameter parallelism = new LongParameter(this, "__parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
+
 	@Override
 	public String getName() {
 		return this.getClass().getSimpleName();

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
index 287e222..563908c 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/EdgeList.java
@@ -55,7 +55,8 @@ extends DriverBase<K, VV, EV> {
 		if (hasNullValueEdges(edges)) {
 			return edges
 				.map(new EdgeToTuple2Map<K, EV>())
-				.name("Edge to Tuple2");
+				.name("Edge to Tuple2")
+				.setParallelism(parallelism.getValue().intValue());
 		} else {
 			return edges;
 		}

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
index f4da13c..61ea60d 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/GraphMetrics.java
@@ -85,18 +85,22 @@ extends DriverBase<K, VV, EV> {
 		switch (order.getValue()) {
 			case DIRECTED:
 				vertexMetrics = graph
-					.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<K, VV, EV>());
+					.run(new org.apache.flink.graph.library.metric.directed.VertexMetrics<K, VV, EV>()
+						.setParallelism(parallelism.getValue().intValue()));
 
 				edgeMetrics = graph
-					.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<K, VV, EV>());
+					.run(new org.apache.flink.graph.library.metric.directed.EdgeMetrics<K, VV, EV>()
+						.setParallelism(parallelism.getValue().intValue()));
 				break;
 
 			case UNDIRECTED:
 				vertexMetrics = graph
-					.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<K, VV, EV>());
+					.run(new org.apache.flink.graph.library.metric.undirected.VertexMetrics<K, VV, EV>()
+						.setParallelism(parallelism.getValue().intValue()));
 
 				edgeMetrics = graph
-					.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<K, VV, EV>());
+					.run(new org.apache.flink.graph.library.metric.undirected.EdgeMetrics<K, VV, EV>()
+						.setParallelism(parallelism.getValue().intValue()));
 				break;
 		}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
index 1987421..3c83c4e 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/HITS.java
@@ -55,7 +55,8 @@ extends DriverBase<K, VV, EV> {
 	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
 		return graph
 			.run(new org.apache.flink.graph.library.linkanalysis.HITS<K, VV, EV>(
-				iterationConvergence.getValue().iterations,
-				iterationConvergence.getValue().convergenceThreshold));
+					iterationConvergence.getValue().iterations,
+					iterationConvergence.getValue().convergenceThreshold)
+				.setParallelism(parallelism.getValue().intValue()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
index 8f6cfb7..7c1639a 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/JaccardIndex.java
@@ -27,8 +27,6 @@ import org.apache.flink.types.CopyableValue;
 import org.apache.commons.lang3.text.StrBuilder;
 import org.apache.commons.lang3.text.WordUtils;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Driver for {@link org.apache.flink.graph.library.similarity.JaccardIndex}.
  */
@@ -53,9 +51,6 @@ extends DriverBase<K, VV, EV> {
 
 	private BooleanParameter mirrorResults = new BooleanParameter(this, "mirror_results");
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getShortDescription() {
 		return "similarity score as fraction of common neighbors";
@@ -76,13 +71,11 @@ extends DriverBase<K, VV, EV> {
 
 	@Override
 	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
-		int lp = littleParallelism.getValue().intValue();
-
 		return graph
 			.run(new org.apache.flink.graph.library.similarity.JaccardIndex<K, VV, EV>()
 				.setMinimumScore(minNumerator.getValue().intValue(), minDenominator.getValue().intValue())
 				.setMaximumScore(maxNumerator.getValue().intValue(), maxDenominator.getValue().intValue())
 				.setMirrorResults(mirrorResults.getValue())
-				.setLittleParallelism(lp));
+				.setParallelism(parallelism.getValue().intValue()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
index 224dea8..299aeed 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/PageRank.java
@@ -60,8 +60,9 @@ extends DriverBase<K, VV, EV> {
 	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
 		return graph
 			.run(new org.apache.flink.graph.library.linkanalysis.PageRank<K, VV, EV>(
-				dampingFactor.getValue(),
-				iterationConvergence.getValue().iterations,
-				iterationConvergence.getValue().convergenceThreshold));
+					dampingFactor.getValue(),
+					iterationConvergence.getValue().iterations,
+					iterationConvergence.getValue().convergenceThreshold)
+				.setParallelism(parallelism.getValue().intValue()));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
index 86a61d5..4a7d230 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/TriangleListing.java
@@ -24,7 +24,6 @@ import org.apache.flink.graph.GraphAnalytic;
 import org.apache.flink.graph.asm.result.PrintableResult;
 import org.apache.flink.graph.drivers.parameter.BooleanParameter;
 import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
-import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.CopyableValue;
 
 import org.apache.commons.lang3.text.StrBuilder;
@@ -32,8 +31,6 @@ import org.apache.commons.lang3.text.WordUtils;
 
 import java.io.PrintStream;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Driver for directed and undirected triangle listing algorithm and analytic.
  *
@@ -56,9 +53,6 @@ extends DriverBase<K, VV, EV> {
 
 	private BooleanParameter computeTriadicCensus = new BooleanParameter(this, "triadic_census");
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	private GraphAnalytic<K, VV, EV, ? extends PrintableResult> triadicCensus;
 
 	@Override
@@ -79,35 +73,35 @@ extends DriverBase<K, VV, EV> {
 
 	@Override
 	public DataSet plan(Graph<K, VV, EV> graph) throws Exception {
-		int lp = littleParallelism.getValue().intValue();
+		int parallelism = this.parallelism.getValue().intValue();
 
 		switch (order.getValue()) {
 			case DIRECTED:
 				if (computeTriadicCensus.getValue()) {
 					triadicCensus = graph
 						.run(new org.apache.flink.graph.library.clustering.directed.TriadicCensus<K, VV, EV>()
-							.setLittleParallelism(lp));
+							.setParallelism(parallelism));
 				}
 
 				@SuppressWarnings("unchecked")
 				DataSet<PrintableResult> directedResult = (DataSet<PrintableResult>) (DataSet<?>) graph
 					.run(new org.apache.flink.graph.library.clustering.directed.TriangleListing<K, VV, EV>()
 						.setSortTriangleVertices(sortTriangleVertices.getValue())
-						.setLittleParallelism(lp));
+						.setParallelism(parallelism));
 				return directedResult;
 
 			case UNDIRECTED:
 				if (computeTriadicCensus.getValue()) {
 					triadicCensus = graph
 						.run(new org.apache.flink.graph.library.clustering.undirected.TriadicCensus<K, VV, EV>()
-							.setLittleParallelism(lp));
+							.setParallelism(parallelism));
 				}
 
 				@SuppressWarnings("unchecked")
 				DataSet<PrintableResult> undirectedResult = (DataSet<PrintableResult>) (DataSet<?>) graph
 					.run(new org.apache.flink.graph.library.clustering.undirected.TriangleListing<K, VV, EV>()
 						.setSortTriangleVertices(sortTriangleVertices.getValue())
-						.setLittleParallelism(lp));
+						.setParallelism(parallelism));
 				return undirectedResult;
 
 			default:

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
index a9d05a2..f7364ce 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CirculantGraph.java
@@ -32,7 +32,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.generator.CirculantGraph.MINIMUM_VERTEX_COUNT;
 
 /**
@@ -46,9 +45,6 @@ extends GeneratedGraph<LongValue> {
 	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
 		.setMinimumValue(MINIMUM_VERTEX_COUNT);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	private List<OffsetRange> offsetRanges = new ArrayList<>();
 
 	@Override
@@ -118,7 +114,7 @@ extends GeneratedGraph<LongValue> {
 		}
 
 		return graph
-			.setParallelism(littleParallelism.getValue().intValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
index 64bae73..d3a7e49 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CompleteGraph.java
@@ -24,7 +24,6 @@ import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.generator.CompleteGraph.MINIMUM_VERTEX_COUNT;
 
 /**
@@ -36,9 +35,6 @@ extends GeneratedGraph<LongValue> {
 	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
 		.setMinimumValue(MINIMUM_VERTEX_COUNT);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexCount.getValue() + ")";
@@ -52,7 +48,7 @@ extends GeneratedGraph<LongValue> {
 	@Override
 	protected Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) throws Exception {
 		return new org.apache.flink.graph.generator.CompleteGraph(env, vertexCount.getValue())
-			.setParallelism(littleParallelism.getValue().intValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
index d84cfca..632ad77 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/CycleGraph.java
@@ -24,7 +24,6 @@ import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.generator.CycleGraph.MINIMUM_VERTEX_COUNT;
 
 /**
@@ -36,9 +35,6 @@ extends GeneratedGraph<LongValue> {
 	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
 		.setMinimumValue(MINIMUM_VERTEX_COUNT);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexCount + ")";
@@ -52,7 +48,7 @@ extends GeneratedGraph<LongValue> {
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		return new org.apache.flink.graph.generator.CycleGraph(env, vertexCount.getValue())
-			.setParallelism(littleParallelism.getValue().intValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
index 5ca2f2f..1eb0dd9 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EchoGraph.java
@@ -24,7 +24,6 @@ import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.generator.EchoGraph.MINIMUM_VERTEX_COUNT;
 import static org.apache.flink.graph.generator.EchoGraph.MINIMUM_VERTEX_DEGREE;
 
@@ -40,9 +39,6 @@ extends GeneratedGraph<LongValue> {
 	private LongParameter vertexDegree = new LongParameter(this, "vertex_degree")
 		.setMinimumValue(MINIMUM_VERTEX_DEGREE);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexCount.getValue() + ":" + vertexDegree.getValue() + ")";
@@ -56,7 +52,7 @@ extends GeneratedGraph<LongValue> {
 	@Override
 	protected Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) throws Exception {
 		return new org.apache.flink.graph.generator.EchoGraph(env, vertexCount.getValue(), vertexDegree.getValue())
-			.setParallelism(littleParallelism.getValue().intValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
index 6feb3c8..f54fcba 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/EmptyGraph.java
@@ -48,6 +48,7 @@ extends GeneratedGraph<LongValue> {
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		return new org.apache.flink.graph.generator.EmptyGraph(env, vertexCount.getValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java
index a0446ee..f6d8ae9 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GeneratedGraph.java
@@ -26,6 +26,7 @@ import org.apache.flink.graph.asm.translate.TranslateGraphIds;
 import org.apache.flink.graph.asm.translate.translators.LongValueToStringValue;
 import org.apache.flink.graph.asm.translate.translators.LongValueToUnsignedIntValue;
 import org.apache.flink.graph.drivers.parameter.ChoiceParameter;
+import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.ByteValue;
 import org.apache.flink.types.CharValue;
 import org.apache.flink.types.LongValue;
@@ -34,6 +35,8 @@ import org.apache.flink.types.ShortValue;
 
 import org.apache.commons.lang3.text.WordUtils;
 
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
 /**
  * Base class for generated graphs.
  *
@@ -65,6 +68,9 @@ extends InputBase<K, NullValue, NullValue> {
 		.addChoices(LONG, STRING)
 		.addHiddenChoices(BYTE, NATIVE_BYTE, SHORT, NATIVE_SHORT, CHAR, NATIVE_CHAR, NATIVE_INTEGER, NATIVE_LONG, NATIVE_STRING);
 
+	protected LongParameter parallelism = new LongParameter(this, "__parallelism")
+		.setDefaultValue(PARALLELISM_DEFAULT);
+
 	/**
 	 * The vertex count is verified to be no greater than the capacity of the
 	 * selected data type. All vertices must be counted even if skipped or

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
index 1b6bac1..b92b175 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/GridGraph.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.api.java.utils.ParameterTool;
 import org.apache.flink.client.program.ProgramParametrizationException;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
@@ -32,8 +31,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.TreeMap;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Generate a {@link org.apache.flink.graph.generator.GridGraph}.
  */
@@ -44,9 +41,6 @@ extends GeneratedGraph<LongValue> {
 
 	private List<Dimension> dimensions = new ArrayList<>();
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getUsage() {
 		return "--" + PREFIX + "0 size:wrap_endpoints [--" + PREFIX + " size:wrap_endpoints [--" + PREFIX + " ...]] "
@@ -105,7 +99,7 @@ extends GeneratedGraph<LongValue> {
 		}
 
 		return graph
-			.setParallelism(littleParallelism.getValue().intValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
index 1be65bd..00b40c7 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/HypercubeGraph.java
@@ -24,7 +24,6 @@ import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.generator.HypercubeGraph.MINIMUM_DIMENSIONS;
 
 /**
@@ -37,9 +36,6 @@ extends GeneratedGraph<LongValue> {
 		.setMinimumValue(MINIMUM_DIMENSIONS)
 		.setMaximumValue(63);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + dimensions + ")";
@@ -53,7 +49,7 @@ extends GeneratedGraph<LongValue> {
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		return new org.apache.flink.graph.generator.HypercubeGraph(env, dimensions.getValue())
-			.setParallelism(littleParallelism.getValue().intValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
index 7f3a3e5..aa0c18a 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/PathGraph.java
@@ -24,7 +24,6 @@ import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.generator.PathGraph.MINIMUM_VERTEX_COUNT;
 
 /**
@@ -36,9 +35,6 @@ extends GeneratedGraph<LongValue> {
 	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
 		.setMinimumValue(MINIMUM_VERTEX_COUNT);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexCount + ")";
@@ -52,7 +48,7 @@ extends GeneratedGraph<LongValue> {
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		return new org.apache.flink.graph.generator.PathGraph(env, vertexCount.getValue())
-			.setParallelism(littleParallelism.getValue().intValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
index cce7afa..f24b99f 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/RMatGraph.java
@@ -32,8 +32,6 @@ import org.apache.flink.types.StringValue;
 
 import org.apache.commons.math3.random.JDKRandomGenerator;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Generate an {@code RMatGraph} with {@link IntValue}, {@link LongValue},
  * or {@link StringValue} keys.
@@ -79,9 +77,6 @@ extends GeneratedMultiGraph<LongValue> {
 	private LongParameter seed = new LongParameter(this, "seed")
 		.setDefaultValue(JDKRandomGeneratorFactory.DEFAULT_SEED);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() +
@@ -95,9 +90,7 @@ extends GeneratedMultiGraph<LongValue> {
 
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) throws Exception {
-		int lp = littleParallelism.getValue().intValue();
-
-		RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory();
+		RandomGenerableFactory<JDKRandomGenerator> rnd = new JDKRandomGeneratorFactory(seed.getValue());
 
 		long vertexCount = 1L << scale.getValue();
 		long edgeCount = vertexCount * edgeFactor.getValue();
@@ -106,7 +99,7 @@ extends GeneratedMultiGraph<LongValue> {
 				env, rnd, vertexCount, edgeCount)
 			.setConstants(a.getValue().floatValue(), b.getValue().floatValue(), c.getValue().floatValue())
 			.setNoise(noiseEnabled.getValue(), noise.getValue().floatValue())
-			.setParallelism(lp)
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
index 44da3f3..b0c7463 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/SingletonEdgeGraph.java
@@ -24,7 +24,6 @@ import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.generator.PathGraph.MINIMUM_VERTEX_COUNT;
 
 /**
@@ -37,9 +36,6 @@ extends GeneratedGraph<LongValue> {
 		.setMinimumValue(MINIMUM_VERTEX_COUNT)
 		.setMaximumValue(1L << 62);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexPairCount + ")";
@@ -53,7 +49,7 @@ extends GeneratedGraph<LongValue> {
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		return new org.apache.flink.graph.generator.SingletonEdgeGraph(env, vertexPairCount.getValue())
-			.setParallelism(littleParallelism.getValue().intValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
index d488b59..cde25ce 100644
--- a/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
+++ b/flink-libraries/flink-gelly-examples/src/main/java/org/apache/flink/graph/drivers/input/StarGraph.java
@@ -24,7 +24,6 @@ import org.apache.flink.graph.drivers.parameter.LongParameter;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.types.NullValue;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 import static org.apache.flink.graph.generator.StarGraph.MINIMUM_VERTEX_COUNT;
 
 /**
@@ -36,9 +35,6 @@ extends GeneratedGraph<LongValue> {
 	private LongParameter vertexCount = new LongParameter(this, "vertex_count")
 		.setMinimumValue(MINIMUM_VERTEX_COUNT);
 
-	private LongParameter littleParallelism = new LongParameter(this, "little_parallelism")
-		.setDefaultValue(PARALLELISM_DEFAULT);
-
 	@Override
 	public String getIdentity() {
 		return getTypeName() + " " + getName() + " (" + vertexCount + ")";
@@ -52,7 +48,7 @@ extends GeneratedGraph<LongValue> {
 	@Override
 	public Graph<LongValue, NullValue, NullValue> generate(ExecutionEnvironment env) {
 		return new org.apache.flink.graph.generator.StarGraph(env, vertexCount.getValue())
-			.setParallelism(littleParallelism.getValue().intValue())
+			.setParallelism(parallelism.getValue().intValue())
 			.generate();
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalyticBase.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalyticBase.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalyticBase.java
index 2e7c5b2..9049268 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalyticBase.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/GraphAnalyticBase.java
@@ -21,6 +21,8 @@ package org.apache.flink.graph;
 import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.util.Preconditions;
 
+import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
+
 /**
  * Base class for {@link GraphAnalytic}.
  *
@@ -34,6 +36,8 @@ implements GraphAnalytic<K, VV, EV, T> {
 
 	protected ExecutionEnvironment env;
 
+	protected int parallelism = PARALLELISM_DEFAULT;
+
 	@Override
 	public GraphAnalytic<K, VV, EV, T> run(Graph<K, VV, EV> input)
 			throws Exception {
@@ -41,6 +45,27 @@ implements GraphAnalytic<K, VV, EV, T> {
 		return this;
 	}
 
+	/**
+	 * Set the parallelism for this analytic's operators. This parameter is
+	 * necessary because processing a small amount of data with high operator
+	 * parallelism is slow and wasteful with memory and buffers.
+	 *
+	 * <p>Operator parallelism should be set to this given value unless
+	 * processing asymptotically more data, in which case the default job
+	 * parallelism should be inherited.
+	 *
+	 * @param parallelism operator parallelism
+	 * @return this
+	 */
+	public GraphAnalyticBase<K, VV, EV, T> setParallelism(int parallelism) {
+		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
+			"The parallelism must be greater than zero.");
+
+		this.parallelism = parallelism;
+
+		return this;
+	}
+
 	@Override
 	public T execute()
 			throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
index 685031c..7191bc9 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeDegreesPair.java
@@ -27,11 +27,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeDegreeWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates edges of a directed graph with the degree, out-degree, and
@@ -44,37 +40,6 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 public class EdgeDegreesPair<K, VV, EV>
 extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, Degrees, Degrees>>> {
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public EdgeDegreesPair<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!EdgeDegreesPair.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
-
-		EdgeDegreesPair rhs = (EdgeDegreesPair) other;
-
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
-	}
-
 	@Override
 	public DataSet<Edge<K, Tuple3<EV, Degrees, Degrees>>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
index 0299839..30d30fa 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeSourceDegrees.java
@@ -26,11 +26,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates edges of a directed graph with the degree, out-degree, and
@@ -43,37 +39,6 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 public class EdgeSourceDegrees<K, VV, EV>
 extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public EdgeSourceDegrees<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!EdgeSourceDegrees.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
-
-		EdgeSourceDegrees rhs = (EdgeSourceDegrees) other;
-
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
-	}
-
 	@Override
 	public DataSet<Edge<K, Tuple2<EV, Degrees>>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
index 7c13b39..57045a1 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/EdgeTargetDegrees.java
@@ -26,11 +26,7 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.Vertex;
 import org.apache.flink.graph.asm.degree.annotate.DegreeAnnotationFunctions.JoinEdgeWithVertexDegree;
 import org.apache.flink.graph.asm.degree.annotate.directed.VertexDegrees.Degrees;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates edges of a directed graph with the degree, out-degree, and
@@ -43,37 +39,6 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 public class EdgeTargetDegrees<K, VV, EV>
 extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, Degrees>>> {
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public EdgeTargetDegrees<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!EdgeTargetDegrees.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
-
-		EdgeTargetDegrees rhs = (EdgeTargetDegrees) other;
-
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
-	}
-
 	@Override
 	public DataSet<Edge<K, Tuple2<EV, Degrees>>> runInternal(Graph<K, VV, EV> input)
 			throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
index b0cb7d7..06a7fd2 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexDegrees.java
@@ -39,9 +39,6 @@ import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.ByteValue;
 import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates vertices of a directed graph with the degree, out-, and in-degree.
@@ -56,8 +53,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> {
 	// Optional configuration
 	private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true);
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * By default only the edge set is processed for the computation of degree.
 	 * When this flag is set an additional join is performed against the vertex
@@ -73,41 +68,24 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, Degrees>> {
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public VertexDegrees<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!VertexDegrees.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		VertexDegrees rhs = (VertexDegrees) other;
 
-		// verify that configurations can be merged
+		return !includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices);
+	}
 
-		if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
-			return false;
-		}
+	@Override
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
-		// merge configurations
+		VertexDegrees rhs = (VertexDegrees) other;
 
 		includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
index 94c2667..dc071cf 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexInDegree.java
@@ -29,9 +29,6 @@ import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates vertices of a directed graph with the in-degree.
@@ -46,8 +43,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 	// Optional configuration
 	private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true);
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * By default only the edge set is processed for the computation of degree.
 	 * When this flag is set an additional join is performed against the vertex
@@ -63,44 +58,24 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public VertexInDegree<K, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!VertexInDegree.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		VertexInDegree rhs = (VertexInDegree) other;
 
-		// verify that configurations can be merged
+		return !includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices);
+	}
 
-		if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
-			return false;
-		}
+	@Override
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
-		// merge configurations
+		VertexInDegree rhs = (VertexInDegree) other;
 
 		includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
index 00f2f89..4a4689b 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/directed/VertexOutDegree.java
@@ -29,9 +29,6 @@ import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates vertices of a directed graph with the out-degree.
@@ -46,8 +43,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 	// Optional configuration
 	private OptionalBoolean includeZeroDegreeVertices = new OptionalBoolean(false, true);
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * By default only the edge set is processed for the computation of degree.
 	 * When this flag is set an additional join is performed against the vertex
@@ -63,44 +58,24 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public VertexOutDegree<K, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!VertexOutDegree.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		VertexOutDegree rhs = (VertexOutDegree) other;
 
-		// verify that configurations can be merged
+		return !includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices);
+	}
 
-		if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
-			return false;
-		}
+	@Override
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
-		// merge configurations
+		VertexOutDegree rhs = (VertexOutDegree) other;
 
 		includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
index 6228cac..ff4285f 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeDegreePair.java
@@ -30,9 +30,6 @@ import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates edges of an undirected graph with the degree of both the source
@@ -48,8 +45,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, L
 	// Optional configuration
 	private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false);
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * The degree can be counted from either the edge source or target IDs.
 	 * By default the source IDs are counted. Reducing on target IDs may
@@ -65,36 +60,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple3<EV, LongValue, L
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public EdgeDegreePair<K, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
 		EdgeDegreePair rhs = (EdgeDegreePair) other;
 
 		reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
index 01ff7d0..bd8ce3d 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeSourceDegree.java
@@ -29,9 +29,6 @@ import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates edges of an undirected graph with degree of the source vertex.
@@ -46,8 +43,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>>
 	// Optional configuration
 	private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false);
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * The degree can be counted from either the edge source or target IDs.
 	 * By default the source IDs are counted. Reducing on target IDs may
@@ -63,36 +58,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>>
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public EdgeSourceDegree<K, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
 		EdgeSourceDegree rhs = (EdgeSourceDegree) other;
 
 		reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
index d3316ea..cb18d2c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/EdgeTargetDegree.java
@@ -29,9 +29,6 @@ import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates edges of an undirected graph with degree of the target vertex.
@@ -46,8 +43,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>>
 	// Optional configuration
 	private OptionalBoolean reduceOnSourceId = new OptionalBoolean(false, false);
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * The degree can be counted from either the edge source or target IDs.
 	 * By default the target IDs are counted. Reducing on source IDs may
@@ -63,36 +58,13 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Edge<K, Tuple2<EV, LongValue>>>
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public EdgeTargetDegree<K, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!EdgeSourceDegree.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
 		EdgeTargetDegree rhs = (EdgeTargetDegree) other;
 
 		reduceOnSourceId.mergeWith(rhs.reduceOnSourceId);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
index 626c11b..d2fad18 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/annotate/undirected/VertexDegree.java
@@ -32,9 +32,6 @@ import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingDataSet;
 import org.apache.flink.graph.utils.proxy.OptionalBoolean;
 import org.apache.flink.types.LongValue;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Annotates vertices of an undirected graph with the degree.
@@ -51,8 +48,6 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 
 	private OptionalBoolean reduceOnTargetId = new OptionalBoolean(false, false);
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * By default only the edge set is processed for the computation of degree.
 	 * When this flag is set an additional join is performed against the vertex
@@ -83,45 +78,25 @@ extends GraphAlgorithmWrappingDataSet<K, VV, EV, Vertex<K, LongValue>> {
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public VertexDegree<K, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!VertexDegree.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		VertexDegree rhs = (VertexDegree) other;
 
-		// verify that configurations can be merged
+		return !includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices);
+	}
 
-		if (includeZeroDegreeVertices.conflictsWith(rhs.includeZeroDegreeVertices)) {
-			return false;
-		}
+	@Override
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
-		// merge configurations
+		VertexDegree rhs = (VertexDegree) other;
 
 		includeZeroDegreeVertices.mergeWith(rhs.includeZeroDegreeVertices);
 		reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
index b485507..522d39c 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/degree/filter/undirected/MaximumDegree.java
@@ -36,8 +36,6 @@ import org.apache.flink.types.LongValue;
 import org.apache.flink.util.Collector;
 import org.apache.flink.util.Preconditions;
 
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
-
 /**
  * Removes vertices from a graph with degree greater than the given maximum.
  * Any edge with with a source or target vertex with degree greater than the
@@ -58,8 +56,6 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 
 	private OptionalBoolean broadcastHighDegreeVertices = new OptionalBoolean(false, false);
 
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * Filter out vertices with degree greater than the given maximum.
 	 *
@@ -103,42 +99,25 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 		return this;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public MaximumDegree<K, VV, EV> setParallelism(int parallelism) {
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!MaximumDegree.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		MaximumDegree rhs = (MaximumDegree) other;
 
-		// verify that configurations can be merged
+		return maximumDegree == rhs.maximumDegree;
+	}
 
-		if (maximumDegree != rhs.maximumDegree) {
-			return false;
-		}
+	@Override
+	protected void mergeConfiguration(GraphAlgorithmWrappingBase other) {
+		super.mergeConfiguration(other);
 
-		// merge configurations
+		MaximumDegree rhs = (MaximumDegree) other;
 
 		reduceOnTargetId.mergeWith(rhs.reduceOnTargetId);
 		broadcastHighDegreeVertices.mergeWith(rhs.broadcastHighDegreeVertices);
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
 	}
 
 	/*

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
index 0d60cd9..1bab9c6 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/directed/Simplify.java
@@ -22,11 +22,7 @@ import org.apache.flink.api.common.functions.FilterFunction;
 import org.apache.flink.api.java.DataSet;
 import org.apache.flink.graph.Edge;
 import org.apache.flink.graph.Graph;
-import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Remove self-loops and duplicate edges from a directed graph.
@@ -38,40 +34,6 @@ import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 public class Simplify<K extends Comparable<K>, VV, EV>
 extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public Simplify<K, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
-	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!Simplify.class.isAssignableFrom(other.getClass())) {
-			return false;
-		}
-
-		Simplify rhs = (Simplify) other;
-
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
-	}
-
 	@Override
 	public Graph<K, VV, EV> runInternal(Graph<K, VV, EV> input)
 			throws Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/d0cc2c17/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
----------------------------------------------------------------------
diff --git a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
index dd3a9b3..6f1e282 100644
--- a/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
+++ b/flink-libraries/flink-gelly/src/main/java/org/apache/flink/graph/asm/simple/undirected/Simplify.java
@@ -25,9 +25,6 @@ import org.apache.flink.graph.Graph;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingBase;
 import org.apache.flink.graph.utils.proxy.GraphAlgorithmWrappingGraph;
 import org.apache.flink.util.Collector;
-import org.apache.flink.util.Preconditions;
-
-import static org.apache.flink.api.common.ExecutionConfig.PARALLELISM_DEFAULT;
 
 /**
  * Add symmetric edges and remove self-loops and duplicate edges from an
@@ -43,9 +40,6 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 	// Required configuration
 	private boolean clipAndFlip;
 
-	// Optional configuration
-	private int parallelism = PARALLELISM_DEFAULT;
-
 	/**
 	 * Simplifies an undirected graph by adding reverse edges and removing
 	 * self-loops and duplicate edges.
@@ -59,43 +53,15 @@ extends GraphAlgorithmWrappingGraph<K, VV, EV, K, VV, EV> {
 		this.clipAndFlip = clipAndFlip;
 	}
 
-	/**
-	 * Override the operator parallelism.
-	 *
-	 * @param parallelism operator parallelism
-	 * @return this
-	 */
-	public Simplify<K, VV, EV> setParallelism(int parallelism) {
-		Preconditions.checkArgument(parallelism > 0 || parallelism == PARALLELISM_DEFAULT,
-			"The parallelism must be greater than zero.");
-
-		this.parallelism = parallelism;
-
-		return this;
-	}
-
 	@Override
-	protected boolean mergeConfiguration(GraphAlgorithmWrappingBase other) {
-		Preconditions.checkNotNull(other);
-
-		if (!Simplify.class.isAssignableFrom(other.getClass())) {
+	protected boolean canMergeConfigurationWith(GraphAlgorithmWrappingBase other) {
+		if (!super.canMergeConfigurationWith(other)) {
 			return false;
 		}
 
 		Simplify rhs = (Simplify) other;
 
-		// verify that configurations can be merged
-
-		if (clipAndFlip != rhs.clipAndFlip) {
-			return false;
-		}
-
-		// merge configurations
-
-		parallelism = (parallelism == PARALLELISM_DEFAULT) ? rhs.parallelism :
-			((rhs.parallelism == PARALLELISM_DEFAULT) ? parallelism : Math.min(parallelism, rhs.parallelism));
-
-		return true;
+		return clipAndFlip == rhs.clipAndFlip;
 	}
 
 	@Override