You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/12 22:25:26 UTC
[05/14] flink git commit: [FLINK-8342] [flip6] Remove generic type
parameter from ClusterDescriptor
[FLINK-8342] [flip6] Remove generic type parameter from ClusterDescriptor
This closes #5228.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/10e900b2
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/10e900b2
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/10e900b2
Branch: refs/heads/master
Commit: 10e900b25ac03876d3f9e78f260d48efe6b9d853
Parents: d7e9dc1
Author: Till Rohrmann <tr...@apache.org>
Authored: Tue Jan 2 10:19:34 2018 +0100
Committer: Till Rohrmann <tr...@apache.org>
Committed: Fri Jan 12 16:14:03 2018 +0100
----------------------------------------------------------------------
.../client/cli/AbstractCustomCommandLine.java | 4 +--
.../apache/flink/client/cli/CliFrontend.java | 34 ++++++++++----------
.../flink/client/cli/CliFrontendParser.java | 14 ++++----
.../flink/client/cli/CustomCommandLine.java | 5 ++-
.../org/apache/flink/client/cli/DefaultCLI.java | 5 ++-
.../flink/client/cli/Flip6DefaultCLI.java | 5 ++-
.../client/deployment/ClusterDescriptor.java | 8 ++---
.../Flip6StandaloneClusterDescriptor.java | 2 +-
.../deployment/StandaloneClusterDescriptor.java | 2 +-
.../apache/flink/client/cli/DefaultCLITest.java | 4 +--
.../client/cli/util/DummyClusterDescriptor.java | 14 ++++----
.../client/cli/util/DummyCustomCommandLine.java | 12 +++----
.../client/cli/util/MockedCliFrontend.java | 2 +-
.../YARNSessionCapacitySchedulerITCase.java | 2 +-
.../flink/yarn/YARNSessionFIFOITCase.java | 4 +--
.../yarn/AbstractYarnClusterDescriptor.java | 4 +--
.../flink/yarn/cli/FlinkYarnSessionCli.java | 2 +-
17 files changed, 57 insertions(+), 66 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
index c7a1672..da21556 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/AbstractCustomCommandLine.java
@@ -19,7 +19,6 @@
package org.apache.flink.client.cli;
import org.apache.flink.client.ClientUtils;
-import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.HighAvailabilityOptions;
import org.apache.flink.configuration.UnmodifiableConfiguration;
@@ -38,9 +37,8 @@ import static org.apache.flink.client.cli.CliFrontend.setJobManagerAddressInConf
* Base class for {@link CustomCommandLine} implementations which specify a JobManager address and
* a ZooKeeper namespace.
*
- * @param <C> type of the ClusterClient which is returned
*/
-public abstract class AbstractCustomCommandLine<C extends ClusterClient> implements CustomCommandLine<C> {
+public abstract class AbstractCustomCommandLine implements CustomCommandLine {
protected final Option zookeeperNamespaceOption = new Option("z", "zookeeperNamespace", true,
"Namespace to create the Zookeeper sub-paths for high availability mode");
http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 630154c..453d086 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -106,7 +106,7 @@ public class CliFrontend {
private final Configuration configuration;
- private final List<CustomCommandLine<?>> customCommandLines;
+ private final List<CustomCommandLine> customCommandLines;
private final Options customCommandLineOptions;
@@ -116,7 +116,7 @@ public class CliFrontend {
public CliFrontend(
Configuration configuration,
- List<CustomCommandLine<?>> customCommandLines) throws Exception {
+ List<CustomCommandLine> customCommandLines) throws Exception {
this.configuration = Preconditions.checkNotNull(configuration);
this.customCommandLines = Preconditions.checkNotNull(customCommandLines);
@@ -129,7 +129,7 @@ public class CliFrontend {
this.customCommandLineOptions = new Options();
- for (CustomCommandLine<?> customCommandLine : customCommandLines) {
+ for (CustomCommandLine customCommandLine : customCommandLines) {
customCommandLine.addGeneralOptions(customCommandLineOptions);
customCommandLine.addRunOptions(customCommandLineOptions);
}
@@ -196,9 +196,9 @@ public class CliFrontend {
throw new CliArgsException("Could not build the program from JAR file.", e);
}
- final CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine);
+ final CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
- final ClusterDescriptor<?> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
+ final ClusterDescriptor clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
try {
final String clusterId = customCommandLine.getClusterId(commandLine);
@@ -351,8 +351,8 @@ public class CliFrontend {
scheduled = true;
}
- final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
- final ClusterDescriptor<?> clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine);
+ final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
+ final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine);
final String clusterId = activeCommandLine.getClusterId(commandLine);
@@ -473,9 +473,9 @@ public class CliFrontend {
throw new CliArgsException("Missing JobID");
}
- final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
+ final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
- final ClusterDescriptor<?> clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine);
+ final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine);
final String clusterId = activeCommandLine.getClusterId(commandLine);
@@ -553,9 +553,9 @@ public class CliFrontend {
throw new CliArgsException("Missing JobID in the command line arguments.");
}
- final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine);
+ final CustomCommandLine activeCommandLine = getActiveCustomCommandLine(commandLine);
- final ClusterDescriptor<?> clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine);
+ final ClusterDescriptor clusterDescriptor = activeCommandLine.createClusterDescriptor(commandLine);
final String clusterId = activeCommandLine.getClusterId(commandLine);
@@ -617,9 +617,9 @@ public class CliFrontend {
return;
}
- CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(commandLine);
+ CustomCommandLine customCommandLine = getActiveCustomCommandLine(commandLine);
- final ClusterDescriptor<?> clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
+ final ClusterDescriptor clusterDescriptor = customCommandLine.createClusterDescriptor(commandLine);
final String clusterId = customCommandLine.getClusterId(commandLine);
@@ -972,7 +972,7 @@ public class CliFrontend {
final Configuration configuration = GlobalConfiguration.loadConfiguration(configurationDirectory);
// 3. load the custom command lines
- final List<CustomCommandLine<?>> customCommandLines = loadCustomCommandLines(
+ final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
configuration,
configurationDirectory);
@@ -1039,8 +1039,8 @@ public class CliFrontend {
config.setInteger(JobManagerOptions.PORT, address.getPort());
}
- public static List<CustomCommandLine<?>> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
- List<CustomCommandLine<?>> customCommandLines = new ArrayList<>(2);
+ public static List<CustomCommandLine> loadCustomCommandLines(Configuration configuration, String configurationDirectory) {
+ List<CustomCommandLine> customCommandLines = new ArrayList<>(2);
// Command line interface of the YARN session, with a special initialization here
// to prefix all options with y/yarn.
@@ -1087,7 +1087,7 @@ public class CliFrontend {
* @param className The fully-qualified class name to load.
* @param params The constructor parameters
*/
- private static CustomCommandLine<?> loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException {
+ private static CustomCommandLine loadCustomCommandLine(String className, Object... params) throws IllegalAccessException, InvocationTargetException, InstantiationException, ClassNotFoundException, NoSuchMethodException {
Class<? extends CustomCommandLine> customCliClass =
Class.forName(className).asSubclass(CustomCommandLine.class);
http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 475d854..10507d6 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -236,7 +236,7 @@ public class CliFrontendParser {
/**
* Prints the help for the client.
*/
- public static void printHelp(Collection<CustomCommandLine<?>> customCommandLines) {
+ public static void printHelp(Collection<CustomCommandLine> customCommandLines) {
System.out.println("./flink <ACTION> [OPTIONS] [ARGUMENTS]");
System.out.println();
System.out.println("The following actions are available:");
@@ -251,7 +251,7 @@ public class CliFrontendParser {
System.out.println();
}
- public static void printHelpForRun(Collection<CustomCommandLine<?>> customCommandLines) {
+ public static void printHelpForRun(Collection<CustomCommandLine> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
@@ -279,7 +279,7 @@ public class CliFrontendParser {
System.out.println();
}
- public static void printHelpForList(Collection<CustomCommandLine<?>> customCommandLines) {
+ public static void printHelpForList(Collection<CustomCommandLine> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
@@ -294,7 +294,7 @@ public class CliFrontendParser {
System.out.println();
}
- public static void printHelpForStop(Collection<CustomCommandLine<?>> customCommandLines) {
+ public static void printHelpForStop(Collection<CustomCommandLine> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
@@ -309,7 +309,7 @@ public class CliFrontendParser {
System.out.println();
}
- public static void printHelpForCancel(Collection<CustomCommandLine<?>> customCommandLines) {
+ public static void printHelpForCancel(Collection<CustomCommandLine> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
@@ -324,7 +324,7 @@ public class CliFrontendParser {
System.out.println();
}
- public static void printHelpForSavepoint(Collection<CustomCommandLine<?>> customCommandLines) {
+ public static void printHelpForSavepoint(Collection<CustomCommandLine> customCommandLines) {
HelpFormatter formatter = new HelpFormatter();
formatter.setLeftPadding(5);
formatter.setWidth(80);
@@ -345,7 +345,7 @@ public class CliFrontendParser {
* @param runOptions True if the run options should be printed, False to print only general options
*/
private static void printCustomCliOptions(
- Collection<CustomCommandLine<?>> customCommandLines,
+ Collection<CustomCommandLine> customCommandLines,
HelpFormatter formatter,
boolean runOptions) {
// prints options from all available command-line classes
http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
index f642484..aabc224 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CustomCommandLine.java
@@ -20,7 +20,6 @@ package org.apache.flink.client.cli;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterSpecification;
-import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.util.FlinkException;
import org.apache.commons.cli.CommandLine;
@@ -31,7 +30,7 @@ import javax.annotation.Nullable;
/**
* Custom command-line interface to load hooks for the command-line interface.
*/
-public interface CustomCommandLine<ClusterType extends ClusterClient> {
+public interface CustomCommandLine {
/**
* Signals whether the custom command-line wants to execute or not.
@@ -67,7 +66,7 @@ public interface CustomCommandLine<ClusterType extends ClusterClient> {
* @return ClusterDescriptor
* @throws FlinkException if the ClusterDescriptor could not be created
*/
- ClusterDescriptor<ClusterType> createClusterDescriptor(CommandLine commandLine) throws FlinkException;
+ ClusterDescriptor createClusterDescriptor(CommandLine commandLine) throws FlinkException;
/**
* Returns the cluster id if a cluster id was specified on the command line, otherwise it
http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
index c29c5b7..5660765 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/DefaultCLI.java
@@ -21,7 +21,6 @@ package org.apache.flink.client.cli;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.StandaloneClusterDescriptor;
-import org.apache.flink.client.program.StandaloneClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.FlinkException;
@@ -32,7 +31,7 @@ import javax.annotation.Nullable;
/**
* The default CLI which is used for interaction with standalone clusters.
*/
-public class DefaultCLI extends AbstractCustomCommandLine<StandaloneClusterClient> {
+public class DefaultCLI extends AbstractCustomCommandLine {
public DefaultCLI(Configuration configuration) {
super(configuration);
@@ -50,7 +49,7 @@ public class DefaultCLI extends AbstractCustomCommandLine<StandaloneClusterClien
}
@Override
- public ClusterDescriptor<StandaloneClusterClient> createClusterDescriptor(
+ public ClusterDescriptor createClusterDescriptor(
CommandLine commandLine) throws FlinkException {
final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(commandLine);
http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java b/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java
index 3adeca6..1a75aac 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/Flip6DefaultCLI.java
@@ -21,7 +21,6 @@ package org.apache.flink.client.cli;
import org.apache.flink.client.deployment.ClusterDescriptor;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.deployment.Flip6StandaloneClusterDescriptor;
-import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.util.FlinkException;
@@ -34,7 +33,7 @@ import javax.annotation.Nullable;
/**
* The default CLI which is used for interaction with standalone clusters.
*/
-public class Flip6DefaultCLI extends AbstractCustomCommandLine<RestClusterClient> {
+public class Flip6DefaultCLI extends AbstractCustomCommandLine {
public static final Option FLIP_6 = new Option("flip6", "Switches the client to Flip-6 mode.");
@@ -63,7 +62,7 @@ public class Flip6DefaultCLI extends AbstractCustomCommandLine<RestClusterClient
}
@Override
- public ClusterDescriptor<RestClusterClient> createClusterDescriptor(
+ public ClusterDescriptor createClusterDescriptor(
CommandLine commandLine) throws FlinkException {
final Configuration effectiveConfiguration = applyCommandLineOptionsToConfiguration(commandLine);
http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
index 1603930..07eeabc 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/ClusterDescriptor.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
/**
* A descriptor to deploy a cluster (e.g. Yarn or Mesos) and return a Client for Cluster communication.
*/
-public interface ClusterDescriptor<ClientType extends ClusterClient> extends AutoCloseable {
+public interface ClusterDescriptor extends AutoCloseable {
/**
* Returns a String containing details about the cluster (NodeManagers, available memory, ...).
@@ -38,7 +38,7 @@ public interface ClusterDescriptor<ClientType extends ClusterClient> extends Aut
* @return Client for the cluster
* @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation
*/
- ClientType retrieve(String applicationID) throws UnsupportedOperationException;
+ ClusterClient retrieve(String applicationID) throws UnsupportedOperationException;
/**
* Triggers deployment of a cluster.
@@ -46,7 +46,7 @@ public interface ClusterDescriptor<ClientType extends ClusterClient> extends Aut
* @return Client for the cluster
* @throws UnsupportedOperationException if this cluster descriptor doesn't support the operation
*/
- ClientType deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException;
+ ClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException;
/**
* Deploys a per-job cluster with the given job on the cluster.
@@ -56,7 +56,7 @@ public interface ClusterDescriptor<ClientType extends ClusterClient> extends Aut
* @return Cluster client to talk to the Flink cluster
* @throws ClusterDeploymentException if the cluster could not be deployed
*/
- ClientType deployJobCluster(
+ ClusterClient deployJobCluster(
final ClusterSpecification clusterSpecification,
final JobGraph jobGraph);
}
http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
index b8eb534..a35a68b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/Flip6StandaloneClusterDescriptor.java
@@ -27,7 +27,7 @@ import org.apache.flink.util.Preconditions;
/**
* A deployment descriptor for an existing cluster.
*/
-public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor<RestClusterClient> {
+public class Flip6StandaloneClusterDescriptor implements ClusterDescriptor {
private final Configuration config;
http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
index 3808efa..ae25194 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/deployment/StandaloneClusterDescriptor.java
@@ -26,7 +26,7 @@ import org.apache.flink.runtime.jobgraph.JobGraph;
/**
* A deployment descriptor for an existing cluster.
*/
-public class StandaloneClusterDescriptor implements ClusterDescriptor<StandaloneClusterClient> {
+public class StandaloneClusterDescriptor implements ClusterDescriptor {
private final Configuration config;
http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
index e73b9c9..6eb005d 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/DefaultCLITest.java
@@ -62,7 +62,7 @@ public class DefaultCLITest extends TestLogger {
final InetSocketAddress expectedAddress = new InetSocketAddress(localhost, port);
- final ClusterDescriptor<?> clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine);
+ final ClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine);
final ClusterClient clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
@@ -89,7 +89,7 @@ public class DefaultCLITest extends TestLogger {
CommandLine commandLine = defaultCLI.parseCommandLineOptions(args, false);
- final ClusterDescriptor<?> clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine);
+ final ClusterDescriptor clusterDescriptor = defaultCLI.createClusterDescriptor(commandLine);
final ClusterClient clusterClient = clusterDescriptor.retrieve(defaultCLI.getClusterId(commandLine));
http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
index e46957b..d300055 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyClusterDescriptor.java
@@ -26,14 +26,12 @@ import org.apache.flink.util.Preconditions;
/**
* Dummy {@link ClusterDescriptor} implementation for testing purposes.
- *
- * @param <C> type of the returned {@link ClusterClient}
*/
-public class DummyClusterDescriptor<C extends ClusterClient> implements ClusterDescriptor<C> {
+public class DummyClusterDescriptor implements ClusterDescriptor {
- private final C clusterClient;
+ private final ClusterClient clusterClient;
- public DummyClusterDescriptor(C clusterClient) {
+ public DummyClusterDescriptor(ClusterClient clusterClient) {
this.clusterClient = Preconditions.checkNotNull(clusterClient);
}
@@ -43,17 +41,17 @@ public class DummyClusterDescriptor<C extends ClusterClient> implements ClusterD
}
@Override
- public C retrieve(String applicationID) throws UnsupportedOperationException {
+ public ClusterClient retrieve(String applicationID) throws UnsupportedOperationException {
return clusterClient;
}
@Override
- public C deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException {
+ public ClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws UnsupportedOperationException {
return clusterClient;
}
@Override
- public C deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
+ public ClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph) {
return clusterClient;
}
http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
index a36e8e9..5279d85 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/DummyCustomCommandLine.java
@@ -31,13 +31,11 @@ import javax.annotation.Nullable;
/**
* Dummy implementation of the {@link CustomCommandLine} for testing purposes.
- *
- * @param <T> type of the returned cluster client
*/
-public class DummyCustomCommandLine<T extends ClusterClient> implements CustomCommandLine<T> {
- private final T clusterClient;
+public class DummyCustomCommandLine implements CustomCommandLine {
+ private final ClusterClient clusterClient;
- public DummyCustomCommandLine(T clusterClient) {
+ public DummyCustomCommandLine(ClusterClient clusterClient) {
this.clusterClient = Preconditions.checkNotNull(clusterClient);
}
@@ -62,8 +60,8 @@ public class DummyCustomCommandLine<T extends ClusterClient> implements CustomCo
}
@Override
- public ClusterDescriptor<T> createClusterDescriptor(CommandLine commandLine) {
- return new DummyClusterDescriptor<>(clusterClient);
+ public ClusterDescriptor createClusterDescriptor(CommandLine commandLine) {
+ return new DummyClusterDescriptor(clusterClient);
}
@Override
http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java
----------------------------------------------------------------------
diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java b/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java
index 00b460f..477293d 100644
--- a/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java
+++ b/flink-clients/src/test/java/org/apache/flink/client/cli/util/MockedCliFrontend.java
@@ -34,6 +34,6 @@ public class MockedCliFrontend extends CliFrontend {
public MockedCliFrontend(ClusterClient clusterClient) throws Exception {
super(
new Configuration(),
- Collections.singletonList(new DummyCustomCommandLine<>(clusterClient)));
+ Collections.singletonList(new DummyCustomCommandLine(clusterClient)));
}
}
http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
index f347f94..5bed22e 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionCapacitySchedulerITCase.java
@@ -365,7 +365,7 @@ public class YARNSessionCapacitySchedulerITCase extends YarnTestBase {
@Test
public void testNonexistingQueueWARNmessage() {
LOG.info("Starting testNonexistingQueueWARNmessage()");
- addTestAppender(YarnClusterDescriptor.class, Level.WARN);
+ addTestAppender(AbstractYarnClusterDescriptor.class, Level.WARN);
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(),
"-t", flinkLibFolder.getAbsolutePath(),
"-n", "1",
http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
index d5a9883..cc26350 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YARNSessionFIFOITCase.java
@@ -179,7 +179,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
@Ignore("The test is too resource consuming (8.5 GB of memory)")
@Test
public void testResourceComputation() {
- addTestAppender(YarnClusterDescriptor.class, Level.WARN);
+ addTestAppender(AbstractYarnClusterDescriptor.class, Level.WARN);
LOG.info("Starting testResourceComputation()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
"-n", "5",
@@ -207,7 +207,7 @@ public class YARNSessionFIFOITCase extends YarnTestBase {
@Ignore("The test is too resource consuming (8 GB of memory)")
@Test
public void testfullAlloc() {
- addTestAppender(YarnClusterDescriptor.class, Level.WARN);
+ addTestAppender(AbstractYarnClusterDescriptor.class, Level.WARN);
LOG.info("Starting testfullAlloc()");
runWithArgs(new String[]{"-j", flinkUberjar.getAbsolutePath(), "-t", flinkLibFolder.getAbsolutePath(),
"-n", "2",
http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
index 0372319..0a977df 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/AbstractYarnClusterDescriptor.java
@@ -99,8 +99,8 @@ import static org.apache.flink.yarn.cli.FlinkYarnSessionCli.getDynamicProperties
/**
* The descriptor with deployment information for spawning or resuming a {@link YarnClusterClient}.
*/
-public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor<YarnClusterClient> {
- private static final Logger LOG = LoggerFactory.getLogger(YarnClusterDescriptor.class);
+public abstract class AbstractYarnClusterDescriptor implements ClusterDescriptor {
+ private static final Logger LOG = LoggerFactory.getLogger(AbstractYarnClusterDescriptor.class);
/**
* Minimum memory requirements, checked by the Client.
http://git-wip-us.apache.org/repos/asf/flink/blob/10e900b2/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
----------------------------------------------------------------------
diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
index d4ab41f..d797f47 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/cli/FlinkYarnSessionCli.java
@@ -81,7 +81,7 @@ import static org.apache.flink.configuration.HighAvailabilityOptions.HA_CLUSTER_
/**
* Class handling the command line interface to the YARN session.
*/
-public class FlinkYarnSessionCli extends AbstractCustomCommandLine<YarnClusterClient> {
+public class FlinkYarnSessionCli extends AbstractCustomCommandLine {
private static final Logger LOG = LoggerFactory.getLogger(FlinkYarnSessionCli.class);
//------------------------------------ Constants -------------------------