You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/12 22:25:26 UTC

[05/14] flink git commit: [FLINK-8342] [flip6] Remove generic type parameter from ClusterDescriptor

[FLINK-8342] [flip6] Remove generic type parameter from ClusterDescriptor

This closes #5228.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10e900b2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10e900b2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10e900b2

Branch: refs/heads/master
Commit: 10e900b25ac03876d3f9e78f260d48efe6b9d853
Parents: d7e9dc1
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jan 2 10:19:34 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jan 12 16:14:03 2018 +0100

----------------------------------------------------------------------
 .../client/cli/AbstractCustomCommandLine.java   |  4 +--
 .../apache/flink/client/cli/CliFrontend.java    | 34 ++++++++++----------
 .../flink/client/cli/CliFrontendParser.java     | 14 ++++----
 .../flink/client/cli/CustomCommandLine.java     |  5 ++-
 .../org/apache/flink/client/cli/DefaultCLI.java |  5 ++-
 .../flink/client/cli/Flip6DefaultCLI.java       |  5 ++-
 .../client/deployment/ClusterDescriptor.java    |  8 ++---
 .../Flip6StandaloneClusterDescriptor.java       |  2 +-
 .../deployment/StandaloneClusterDescriptor.java |  2 +-
 .../apache/flink/client/cli/DefaultCLITest.java |  4 +--
 .../client/cli/util/DummyClusterDescriptor.java | 14 ++++----
 .../client/cli/util/DummyCustomCommandLine.java | 12 +++----
 .../client/cli/util/MockedCliFrontend.java      |  2 +-
 .../YARNSessionCapacitySchedulerITCase.java     |  2 +-
 .../flink/yarn/YARNSessionFIFOITCase.java       |  4 +--
 .../yarn/AbstractYarnClusterDescriptor.java     |  4 +--
 .../flink/yarn/cli/FlinkYarnSessionCli.java     |  2 +-
 17 files changed, 57 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
index c7a1672..da21556 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
@@ -19,7 +19,6 @@
 package org.apache.flink.client.cli;
 
 import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.HighAvailabilityOptions;
 import org.apache.flink.configuration.UnmodifiableConfiguration;
@@ -38,9 +37,8 @@ import static org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConf
  * Base class for {@link CustomCommandLine} implementations which specify a JobManager address and
  * a ZooKeeper namespace.
  *
- * @param <C> type of the ClusterClient which is returned
  */
-public abstract class AbstractCustomCommandLine<C extends ClusterClient> implements CustomCommandLine<C> {
+public abstract class AbstractCustomCommandLine implements CustomCommandLine {
 
 	protected final Option zookeeperNamespaceOption = new Option("z", "zookeeperNamespace", true,
 		"Namespace to create the Zookeeper sub-paths for high availability mode");

http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 630154c..453d086 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -106,7 +106,7 @@ public class CliFrontend {
 
 	private final Configuration configuration;
 
-	private final List<CustomCommandLine<?>> customCommandLines;
+	private final List<CustomCommandLine> customCommandLines;
 
 	private final Options customCommandLineOptions;
 
@@ -116,7 +116,7 @@ public class CliFrontend {
 
 	public CliFrontend(
 			Configuration configuration,
-			List<CustomCommandLine<?>> customCommandLines) throws Exception {
+			List<CustomCommandLine> customCommandLines) throws Exception {
 		this.configuration = Preconditions.checkNotNull(configuration);
 		this.customCommandLines = Preconditions.checkNotNull(customCommandLines);
 
@@ -129,7 +129,7 @@ public class CliFrontend {
 
 		this.customCommandLineOptions = new Options();
 
-		for (CustomCommandLine<?> customCommandLine : customCommandLines) {
+		for (CustomCommandLine customCommandLine : customCommandLines) {
 			customCommandLine.addGeneralOptions(customCommandLineOptions);
 			customCommandLine.addRunOptions(customCommandLineOptions);
 		}
@@ -196,9 +196,9 @@ public class CliFrontend {
 			throw new CliArgsException("Could not build the program from JAR file.", e);
 		}
 
-		final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine);
+		final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
 
-		final ClusterDescriptor<?> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
+		final ClusterDescriptor clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
 
 		try {
 			final String clusterId = customCommandLine.getClusterId(commandLine);
@@ -351,8 +351,8 @@ public class CliFrontend {
 			scheduled = true;
 		}
 
-		final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
-		final ClusterDescriptor<?> clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine);
+		final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
+		final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine);
 
 		final String clusterId = activeCommandLine.getClusterId(commandLine);
 
@@ -473,9 +473,9 @@ public class CliFrontend {
 			throw new CliArgsException("Missing JobID");
 		}
 
-		final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
+		final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
 
-		final ClusterDescriptor<?> clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine);
+		final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine);
 
 		final String clusterId = activeCommandLine.getClusterId(commandLine);
 
@@ -553,9 +553,9 @@ public class CliFrontend {
 			throw new CliArgsException("Missing JobID in the command line arguments.");
 		}
 
-		final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
+		final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
 
-		final ClusterDescriptor<?> clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine);
+		final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine);
 
 		final String clusterId = activeCommandLine.getClusterId(commandLine);
 
@@ -617,9 +617,9 @@ public class CliFrontend {
 			return;
 		}
 
-		CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine);
+		CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
 
-		final ClusterDescriptor<?> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
+		final ClusterDescriptor clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
 
 		final String clusterId = customCommandLine.getClusterId(commandLine);
 
@@ -972,7 +972,7 @@ public class CliFrontend {
 		final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
 
 		// 3. load the custom command lines
-		final List<CustomCommandLine<?>> customCommandLines = loadCustomCommandLines(
+		final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
 			configuration,
 			configurationDirectory);
 
@@ -1039,8 +1039,8 @@ public class CliFrontend {
 		config.setInteger(JobManagerOptions.PORT, address.getPort());
 	}
 
-	public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
-		List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2);
+	public static List<CustomCommandLine> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
+		List<CustomCommandLine> customCommandLines = new ArrayList<>(2);
 
 		//	Command line interface of the YARN session, with a special initialization here
 		//	to prefix all options with y/yarn.
@@ -1087,7 +1087,7 @@ public class CliFrontend {
 	 * @param className The fully-qualified class name to load.
 	 * @param params The constructor parameters
 	 */
-	private static CustomCommandLine<?> loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException {
+	private static CustomCommandLine loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException {
 
 		Class<? extends CustomCommandLine> customCliClass =
 			Class.forName(className).asSubclass(CustomCommandLine.class);

http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 475d854..10507d6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -236,7 +236,7 @@ public class CliFrontendParser {
 	/**
 	 * Prints the help for the client.
 	 */
-	public static void printHelp(Collection<CustomCommandLine<?>> customCommandLines) {
+	public static void printHelp(Collection<CustomCommandLine> customCommandLines) {
 		System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]");
 		System.out.println();
 		System.out.println("The following actions are available:");
@@ -251,7 +251,7 @@ public class CliFrontendParser {
 		System.out.println();
 	}
 
-	public static void printHelpForRun(Collection<CustomCommandLine<?>> customCommandLines) {
+	public static void printHelpForRun(Collection<CustomCommandLine> customCommandLines) {
 		HelpFormatter formatter = new HelpFormatter();
 		formatter.setLeftPadding(5);
 		formatter.setWidth(80);
@@ -279,7 +279,7 @@ public class CliFrontendParser {
 		System.out.println();
 	}
 
-	public static void printHelpForList(Collection<CustomCommandLine<?>> customCommandLines) {
+	public static void printHelpForList(Collection<CustomCommandLine> customCommandLines) {
 		HelpFormatter formatter = new HelpFormatter();
 		formatter.setLeftPadding(5);
 		formatter.setWidth(80);
@@ -294,7 +294,7 @@ public class CliFrontendParser {
 		System.out.println();
 	}
 
-	public static void printHelpForStop(Collection<CustomCommandLine<?>> customCommandLines) {
+	public static void printHelpForStop(Collection<CustomCommandLine> customCommandLines) {
 		HelpFormatter formatter = new HelpFormatter();
 		formatter.setLeftPadding(5);
 		formatter.setWidth(80);
@@ -309,7 +309,7 @@ public class CliFrontendParser {
 		System.out.println();
 	}
 
-	public static void printHelpForCancel(Collection<CustomCommandLine<?>> customCommandLines) {
+	public static void printHelpForCancel(Collection<CustomCommandLine> customCommandLines) {
 		HelpFormatter formatter = new HelpFormatter();
 		formatter.setLeftPadding(5);
 		formatter.setWidth(80);
@@ -324,7 +324,7 @@ public class CliFrontendParser {
 		System.out.println();
 	}
 
-	public static void printHelpForSavepoint(Collection<CustomCommandLine<?>> customCommandLines) {
+	public static void printHelpForSavepoint(Collection<CustomCommandLine> customCommandLines) {
 		HelpFormatter formatter = new HelpFormatter();
 		formatter.setLeftPadding(5);
 		formatter.setWidth(80);
@@ -345,7 +345,7 @@ public class CliFrontendParser {
 	 * @param runOptions True if the run options should be printed, False to print only general options
 	 */
 	private static void printCustomCliOptions(
-			Collection<CustomCommandLine<?>> customCommandLines,
+			Collection<CustomCommandLine> customCommandLines,
 			HelpFormatter formatter,
 			boolean runOptions) {
 		// prints options from all available command-line classes

http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
index f642484..aabc224 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -20,7 +20,6 @@ package org.apache.flink.client.cli;
 
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
-import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.util.FlinkException;
 
 import org.apache.commons.cli.CommandLine;
@@ -31,7 +30,7 @@ import javax.annotation.Nullable;
 /**
  * Custom command-line interface to load hooks for the command-line interface.
  */
-public interface CustomCommandLine<ClusterType extends ClusterClient> {
+public interface CustomCommandLine {
 
 	/**
 	 * Signals whether the custom command-line wants to execute or not.
@@ -67,7 +66,7 @@ public interface CustomCommandLine<ClusterType extends ClusterClient> {
 	 * @return ClusterDescriptor
 	 * @throws FlinkException if the ClusterDescriptor could not be created
 	 */
-	ClusterDescriptor<ClusterType> createClusterDescriptor(CommandLine commandLine) throws FlinkException;
+	ClusterDescriptor createClusterDescriptor(CommandLine commandLine) throws FlinkException;
 
 	/**
 	 * Returns the cluster id if a cluster id was specified on the command line, otherwise it

http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index c29c5b7..5660765 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -21,7 +21,6 @@ package org.apache.flink.client.cli;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
-import org.apache.flink.client.program.StandaloneClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.FlinkException;
 
@@ -32,7 +31,7 @@ import javax.annotation.Nullable;
 /**
  * The default CLI which is used for interaction with standalone clusters.
  */
-public class DefaultCLI extends AbstractCustomCommandLine<StandaloneClusterClient> {
+public class DefaultCLI extends AbstractCustomCommandLine {
 
 	public DefaultCLI(Configuration configuration) {
 		super(configuration);
@@ -50,7 +49,7 @@ public class DefaultCLI extends AbstractCustomCommandLine<StandaloneClusterClien
 	}
 
 	@Override
-	public ClusterDescriptor<StandaloneClusterClient> createClusterDescriptor(
+	public ClusterDescriptor createClusterDescriptor(
 			CommandLine commandLine) throws FlinkException {
 		final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(commandLine);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java
index 3adeca6..1a75aac 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java
@@ -21,7 +21,6 @@ package org.apache.flink.client.cli;
 import org.apache.flink.client.deployment.ClusterDescriptor;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.deployment.Flip6StandaloneClusterDescriptor;
-import org.apache.flink.client.program.rest.RestClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.util.FlinkException;
 
@@ -34,7 +33,7 @@ import javax.annotation.Nullable;
 /**
  * The default CLI which is used for interaction with standalone clusters.
  */
-public class Flip6DefaultCLI extends AbstractCustomCommandLine<RestClusterClient> {
+public class Flip6DefaultCLI extends AbstractCustomCommandLine {
 
 	public static final Option FLIP_6 = new Option("flip6", "Switches the client to Flip-6 mode.");
 
@@ -63,7 +62,7 @@ public class Flip6DefaultCLI extends AbstractCustomCommandLine<RestClusterClient
 	}
 
 	@Override
-	public ClusterDescriptor<RestClusterClient> createClusterDescriptor(
+	public ClusterDescriptor createClusterDescriptor(
 			CommandLine commandLine) throws FlinkException {
 		final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(commandLine);
 

http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index 1603930..07eeabc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 /**
  * A descriptor to deploy a cluster (e.g. Yarn or Mesos) and return a Client for Cluster communication.
  */
-public interface ClusterDescriptor<ClientType extends ClusterClient> extends AutoCloseable {
+public interface ClusterDescriptor extends AutoCloseable {
 
 	/**
 	 * Returns a String containing details about the cluster (NodeManagers, available memory, ...).
@@ -38,7 +38,7 @@ public interface ClusterDescriptor<ClientType extends ClusterClient> extends Aut
 	 * @return Client for the cluster
 	 * @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation
 	 */
-	ClientType retrieve(String applicationID) throws UnsupportedOperationException;
+	ClusterClient retrieve(String applicationID) throws UnsupportedOperationException;
 
 	/**
 	 * Triggers deployment of a cluster.
@@ -46,7 +46,7 @@ public interface ClusterDescriptor<ClientType extends ClusterClient> extends Aut
 	 * @return Client for the cluster
 	 * @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation
 	 */
-	ClientType deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException;
+	ClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException;
 
 	/**
 	 * Deploys a per-job cluster with the given job on the cluster.
@@ -56,7 +56,7 @@ public interface ClusterDescriptor<ClientType extends ClusterClient> extends Aut
 	 * @return Cluster client to talk to the Flink cluster
 	 * @throws ClusterDeploymentException if the cluster could not be deployed
 	 */
-	ClientType deployJobCluster(
+	ClusterClient deployJobCluster(
 		final ClusterSpecification clusterSpecification,
 		final JobGraph jobGraph);
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
index b8eb534..a35a68b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
@@ -27,7 +27,7 @@ import org.apache.flink.util.Preconditions;
 /**
  * A deployment descriptor for an existing cluster.
  */
-public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor<RestClusterClient> {
+public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor {
 
 	private final Configuration config;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index 3808efa..ae25194 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
 /**
  * A deployment descriptor for an existing cluster.
  */
-public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterClient> {
+public class StandaloneClusterDescriptor implements ClusterDescriptor {
 
 	private final Configuration config;
 

http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
index e73b9c9..6eb005d 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
@@ -62,7 +62,7 @@ public class DefaultCLITest extends TestLogger {
 
 		final InetSocketAddress expectedAddress = new InetSocketAddress(localhost, port);
 
-		final ClusterDescriptor<?> clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine);
+		final ClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine);
 
 		final ClusterClient clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
 
@@ -89,7 +89,7 @@ public class DefaultCLITest extends TestLogger {
 
 		CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
 
-		final ClusterDescriptor<?> clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine);
+		final ClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine);
 
 		final ClusterClient clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
 

http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
index e46957b..d300055 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
@@ -26,14 +26,12 @@ import org.apache.flink.util.Preconditions;
 
 /**
  * Dummy {@link ClusterDescriptor} implementation for testing purposes.
- *
- * @param <C> type of the returned {@link ClusterClient}
  */
-public class DummyClusterDescriptor<C extends ClusterClient> implements ClusterDescriptor<C> {
+public class DummyClusterDescriptor implements ClusterDescriptor {
 
-	private final C clusterClient;
+	private final ClusterClient clusterClient;
 
-	public DummyClusterDescriptor(C clusterClient) {
+	public DummyClusterDescriptor(ClusterClient clusterClient) {
 		this.clusterClient = Preconditions.checkNotNull(clusterClient);
 	}
 
@@ -43,17 +41,17 @@ public class DummyClusterDescriptor<C extends ClusterClient> implements ClusterD
 	}
 
 	@Override
-	public C retrieve(String applicationID) throws UnsupportedOperationException {
+	public ClusterClient retrieve(String applicationID) throws UnsupportedOperationException {
 		return clusterClient;
 	}
 
 	@Override
-	public C deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException {
+	public ClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException {
 		return clusterClient;
 	}
 
 	@Override
-	public C deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
+	public ClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
 		return clusterClient;
 	}
 

http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
index a36e8e9..5279d85 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
@@ -31,13 +31,11 @@ import javax.annotation.Nullable;
 
 /**
  * Dummy implementation of the {@link CustomCommandLine} for testing purposes.
- *
- * @param <T> type of the returned cluster client
  */
-public class DummyCustomCommandLine<T extends ClusterClient> implements CustomCommandLine<T> {
-	private final T clusterClient;
+public class DummyCustomCommandLine implements CustomCommandLine {
+	private final ClusterClient clusterClient;
 
-	public DummyCustomCommandLine(T clusterClient) {
+	public DummyCustomCommandLine(ClusterClient clusterClient) {
 		this.clusterClient = Preconditions.checkNotNull(clusterClient);
 	}
 
@@ -62,8 +60,8 @@ public class DummyCustomCommandLine<T extends ClusterClient> implements CustomCo
 	}
 
 	@Override
-	public ClusterDescriptor<T> createClusterDescriptor(CommandLine commandLine) {
-		return new DummyClusterDescriptor<>(clusterClient);
+	public ClusterDescriptor createClusterDescriptor(CommandLine commandLine) {
+		return new DummyClusterDescriptor(clusterClient);
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java
index 00b460f..477293d 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java
@@ -34,6 +34,6 @@ public class MockedCliFrontend extends CliFrontend {
 	public MockedCliFrontend(ClusterClient clusterClient) throws Exception {
 		super(
 			new Configuration(),
-			Collections.singletonList(new DummyCustomCommandLine<>(clusterClient)));
+			Collections.singletonList(new DummyCustomCommandLine(clusterClient)));
 	}
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index f347f94..5bed22e 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -365,7 +365,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
 	@Test
 	public void testNonexistingQueueWARNmessage() {
 		LOG.info("Starting testNonexistingQueueWARNmessage()");
-		addTestAppender(YarnClusterDescriptor.class, Level.WARN);
+		addTestAppender(AbstractYarnClusterDescriptor.class, Level.WARN);
 		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
 				"-t", flinkLibFolder.getAbsolutePath(),
 				"-n", "1",

http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index d5a9883..cc26350 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -179,7 +179,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	@Ignore("The test is too resource consuming (8.5 GB of memory)")
 	@Test
 	public void testResourceComputation() {
-		addTestAppender(YarnClusterDescriptor.class, Level.WARN);
+		addTestAppender(AbstractYarnClusterDescriptor.class, Level.WARN);
 		LOG.info("Starting testResourceComputation()");
 		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
 				"-n", "5",
@@ -207,7 +207,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
 	@Ignore("The test is too resource consuming (8 GB of memory)")
 	@Test
 	public void testfullAlloc() {
-		addTestAppender(YarnClusterDescriptor.class, Level.WARN);
+		addTestAppender(AbstractYarnClusterDescriptor.class, Level.WARN);
 		LOG.info("Starting testfullAlloc()");
 		runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
 				"-n", "2",

http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 0372319..0a977df 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -99,8 +99,8 @@ import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties
 /**
  * The descriptor with deployment information for spawning or resuming a {@link YarnClusterClient}.
  */
-public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor<YarnClusterClient> {
-	private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class);
+public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor {
+	private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnClusterDescriptor.class);
 
 	/**
 	 * Minimum memory requirements, checked by the Client.

http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index d4ab41f..d797f47 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -81,7 +81,7 @@ import static org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_
 /**
  * Class handling the command line interface to the YARN session.
  */
-public class FlinkYarnSessionCli extends AbstractCustomCommandLine<YarnClusterClient> {
+public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
 	private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnSessionCli.class);
 
 	//------------------------------------ Constants   -------------------------