You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/11 23:19:53 UTC
[4/8] flink git commit: [FLINK-8338] [flip6] Make CustomCommandLines
non static in CliFrontend
http://git-wip-us.apache.org/repos/asf/flink/blob/aff43768/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index a255453..8572345 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -33,8 +33,10 @@ import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationReport;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -74,6 +76,8 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
private static final PrintStream OUT = System.out;
private static final PrintStream ERR = System.err;
+ private static FlinkYarnSessionCli cli;
+
@BeforeClass
public static void disableStdOutErr() {
class NullPrint extends OutputStream {
@@ -89,6 +93,8 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
Map<String, String> map = new HashMap<>(System.getenv());
map.remove(ConfigConstants.ENV_FLINK_CONF_DIR);
TestBaseUtils.setEnv(map);
+
+ cli = new FlinkYarnSessionCli("y", "yarn");
}
@AfterClass
@@ -122,12 +128,21 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
+ final Configuration configuration = new Configuration();
+ configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
+
// start CLI Frontend
- TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
+ TestCLI frontend = new CustomYarnTestCLI(configuration, directoryPath.getAbsolutePath());
- RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
+ Options options = CliFrontendParser.getRunCommandOptions();
+ cli.addGeneralOptions(options);
+ cli.addRunOptions(options);
- frontend.retrieveClient(options);
+ final CommandLine commandLine = CliFrontendParser.parse(options, new String[] {}, true);
+
+ RunOptions runOptions = new RunOptions(commandLine);
+
+ frontend.retrieveClient(runOptions);
checkJobManagerAddress(
frontend.getConfiguration(),
TEST_YARN_JOB_MANAGER_ADDRESS,
@@ -140,8 +155,11 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
+ final Configuration configuration = new Configuration();
+ configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
+
// start CLI Frontend
- TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
+ TestCLI frontend = new CustomYarnTestCLI(configuration, directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
@@ -157,7 +175,10 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile);
- TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
+ final Configuration configuration = new Configuration();
+ configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
+
+ TestCLI frontend = new CustomYarnTestCLI(configuration, directoryPath.getAbsolutePath());
RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
@@ -174,8 +195,11 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
public void testResumeFromYarnID() throws Exception {
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
+ final Configuration configuration = new Configuration();
+ configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
+
// start CLI Frontend
- TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
+ TestCLI frontend = new CustomYarnTestCLI(configuration, directoryPath.getAbsolutePath());
RunOptions options =
CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
@@ -191,13 +215,19 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
@Test
public void testResumeFromYarnIDZookeeperNamespace() throws Exception {
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
+
// start CLI Frontend
- TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
+ TestCLI frontend = new CustomYarnTestCLI(new Configuration(), directoryPath.getAbsolutePath());
- RunOptions options =
- CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
+ Options options = CliFrontendParser.getRunCommandOptions();
+ cli.addGeneralOptions(options);
+ cli.addRunOptions(options);
- frontend.retrieveClient(options);
+ final CommandLine commandLine = CliFrontendParser.parse(options, new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true);
+
+ RunOptions runOptions = new RunOptions(commandLine);
+
+ frontend.retrieveClient(runOptions);
String zkNs = frontend.getConfiguration().getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
Assert.assertTrue(zkNs.matches("application_\\d+_0042"));
}
@@ -205,13 +235,23 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
@Test
public void testResumeFromYarnIDZookeeperNamespaceOverride() throws Exception {
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
+
+ final Configuration configuration = new Configuration();
+ configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
+
// start CLI Frontend
- TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
+ TestCLI frontend = new CustomYarnTestCLI(configuration, directoryPath.getAbsolutePath());
String overrideZkNamespace = "my_cluster";
- RunOptions options =
- CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString(), "-yz", overrideZkNamespace});
- frontend.retrieveClient(options);
+ Options options = CliFrontendParser.getRunCommandOptions();
+ cli.addGeneralOptions(options);
+ cli.addRunOptions(options);
+
+ final CommandLine commandLine = CliFrontendParser.parse(options, new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString(), "-yz", overrideZkNamespace}, true);
+
+ RunOptions runOptions = new RunOptions(commandLine);
+
+ frontend.retrieveClient(runOptions);
String zkNs = frontend.getConfiguration().getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
Assert.assertEquals(overrideZkNamespace, zkNs);
}
@@ -220,8 +260,11 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
public void testResumeFromInvalidYarnID() throws Exception {
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
+ final Configuration configuration = new Configuration();
+ configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
+
// start CLI Frontend
- TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
+ TestCLI frontend = new CustomYarnTestCLI(configuration, directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
RunOptions options =
CliFrontendParser.parseRunCommand(new String[] {"-yid", ApplicationId.newInstance(0, 666).toString()});
@@ -237,8 +280,11 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
public void testResumeFromYarnIDWithFinishedApplication() throws Exception {
File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
+ final Configuration configuration = new Configuration();
+ configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
+
// start CLI Frontend
- TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
+ TestCLI frontend = new CustomYarnTestCLI(configuration, directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
RunOptions options =
CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
@@ -255,13 +301,21 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
public void testYarnIDOverridesPropertiesFile() throws Exception {
File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile);
+ final Configuration configuration = new Configuration();
+ configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
+
// start CLI Frontend
- TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
+ TestCLI frontend = new CustomYarnTestCLI(configuration, directoryPath.getAbsolutePath());
- RunOptions options =
- CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
+ Options options = CliFrontendParser.getRunCommandOptions();
+ cli.addGeneralOptions(options);
+ cli.addRunOptions(options);
- frontend.retrieveClient(options);
+ final CommandLine commandLine = CliFrontendParser.parse(options, new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true);
+
+ RunOptions runOptions = new RunOptions(commandLine);
+
+ frontend.retrieveClient(runOptions);
checkJobManagerAddress(
frontend.getConfiguration(),
@@ -276,7 +330,10 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
File testConfFile = new File(emptyFolder.getAbsolutePath(), "flink-conf.yaml");
Files.createFile(testConfFile.toPath());
- TestCLI frontend = new TestCLI(emptyFolder.getAbsolutePath());
+ final Configuration configuration = new Configuration();
+ configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, testConfFile.getAbsolutePath());
+
+ TestCLI frontend = new TestCLI(configuration, emptyFolder.getAbsolutePath());
RunOptions options = CliFrontendParser.parseRunCommand(new String[] {"-m", "10.221.130.22:7788"});
@@ -311,8 +368,11 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
}
private static class TestCLI extends CliFrontend {
- TestCLI(String configDir) throws Exception {
- super(configDir);
+ TestCLI(Configuration configuration, String configDir) throws Exception {
+ super(
+ configuration,
+ CliFrontend.loadCustomCommandLines(),
+ configDir);
}
@Override
@@ -336,12 +396,15 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
// the default application status for yarn applications to be retrieved
private final FinalApplicationStatus finalApplicationStatus;
- CustomYarnTestCLI(String configDir) throws Exception {
- this(configDir, FinalApplicationStatus.UNDEFINED);
+ CustomYarnTestCLI(Configuration configuration, String configDir) throws Exception {
+ this(configuration, configDir, FinalApplicationStatus.UNDEFINED);
}
- CustomYarnTestCLI(String configDir, FinalApplicationStatus finalApplicationStatus) throws Exception {
- super(configDir);
+ CustomYarnTestCLI(
+ Configuration configuration,
+ String configDir,
+ FinalApplicationStatus finalApplicationStatus) throws Exception {
+ super(configuration, configDir);
this.finalApplicationStatus = finalApplicationStatus;
}
@@ -369,7 +432,7 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
}
/**
- * Replace the YarnClient for this test.
+ * Replace the YarnClusterClient for this test.
*/
private class TestingYarnClusterDescriptor extends YarnClusterDescriptor {
http://git-wip-us.apache.org/repos/asf/flink/blob/aff43768/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 8541c40..627a6a4 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.yarn;
import org.apache.flink.client.cli.CliFrontend;
import org.apache.flink.client.cli.CliFrontendParser;
-import org.apache.flink.client.cli.RunOptions;
import org.apache.flink.client.deployment.ClusterSpecification;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.configuration.Configuration;
@@ -89,11 +88,15 @@ public class FlinkYarnSessionCliTest extends TestLogger {
String[] params =
new String[] {"-yn", "2", "-ys", "3", "-p", "7", jarFile.getAbsolutePath()};
- RunOptions runOptions = CliFrontendParser.parseRunCommand(params);
-
FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");
- ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(new Configuration(), runOptions.getCommandLine());
+ final Options options = CliFrontendParser.getRunCommandOptions();
+ yarnCLI.addRunOptions(options);
+ yarnCLI.addGeneralOptions(options);
+
+ CommandLine commandLine = CliFrontendParser.parse(options, params, true);
+
+ ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(new Configuration(), commandLine);
// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
Assert.assertEquals(4, clusterSpecification.getSlotsPerTaskManager());
@@ -102,66 +105,67 @@ public class FlinkYarnSessionCliTest extends TestLogger {
@Test
public void testCorrectSettingOfMaxSlots() throws Exception {
-
- File confFile = tmp.newFile("flink-conf.yaml");
File jarFile = tmp.newFile("test.jar");
- CliFrontend cliFrontend = new CliFrontend(tmp.getRoot().getAbsolutePath());
-
- final Configuration config = cliFrontend.getConfiguration();
String[] params =
new String[] {"-yn", "2", "-ys", "3", jarFile.getAbsolutePath()};
- RunOptions runOptions = CliFrontendParser.parseRunCommand(params);
-
FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");
+ final Options options = CliFrontendParser.getRunCommandOptions();
+ yarnCLI.addRunOptions(options);
+ yarnCLI.addGeneralOptions(options);
+
+ CommandLine commandLine = CliFrontendParser.parse(options, params, true);
+
final Configuration configuration = new Configuration();
AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor(
configuration,
tmp.getRoot().getAbsolutePath(),
"",
- runOptions.getCommandLine());
+ commandLine);
final ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(
configuration,
- runOptions.getCommandLine());
+ commandLine);
// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
Assert.assertEquals(3, clusterSpecification.getSlotsPerTaskManager());
Assert.assertEquals(2, clusterSpecification.getNumberTaskManagers());
- CliFrontend.setJobManagerAddressInConfig(config, new InetSocketAddress("localhost", 9000));
+ CliFrontend.setJobManagerAddressInConfig(configuration, new InetSocketAddress("localhost", 9000));
ClusterClient client = new TestingYarnClusterClient(
descriptor,
clusterSpecification.getNumberTaskManagers(),
clusterSpecification.getSlotsPerTaskManager(),
- config);
+ configuration);
Assert.assertEquals(6, client.getMaxSlots());
}
@Test
public void testZookeeperNamespaceProperty() throws Exception {
- File confFile = tmp.newFile("flink-conf.yaml");
File jarFile = tmp.newFile("test.jar");
- CliFrontend cliFrontend = new CliFrontend(tmp.getRoot().getAbsolutePath());
- final Configuration configuration = cliFrontend.getConfiguration();
String zkNamespaceCliInput = "flink_test_namespace";
String[] params =
new String[] {"-yn", "2", "-yz", zkNamespaceCliInput, jarFile.getAbsolutePath()};
- RunOptions runOptions = CliFrontendParser.parseRunCommand(params);
-
FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");
+
+ final Options options = CliFrontendParser.getRunCommandOptions();
+ yarnCLI.addRunOptions(options);
+ yarnCLI.addGeneralOptions(options);
+
+ CommandLine commandLine = CliFrontendParser.parse(options, params, true);
+
AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor(
- configuration,
+ new Configuration(),
tmp.getRoot().getAbsolutePath(),
"",
- runOptions.getCommandLine());
+ commandLine);
Assert.assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace());
}
http://git-wip-us.apache.org/repos/asf/flink/blob/aff43768/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index d9872a0..f242752 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.client.cli.CustomCommandLine;
import org.apache.flink.client.program.ClusterClient;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.configuration.SecurityOptions;
import org.apache.flink.runtime.instance.ActorGateway;
import org.apache.flink.test.util.TestBaseUtils;
@@ -808,7 +809,12 @@ public abstract class YarnTestBase extends TestLogger {
private ClusterClient originalClusterClient;
private ClusterClient spiedClusterClient;
- public TestingCLI() throws Exception {}
+ public TestingCLI() throws Exception {
+ super(
+ GlobalConfiguration.loadConfiguration(CliFrontend.getConfigurationDirectoryFromEnv()),
+ CliFrontend.loadCustomCommandLines(),
+ CliFrontend.getConfigurationDirectoryFromEnv());
+ }
@Override
protected ClusterClient createClient(CustomCommandLine<?> customCommandLine, CommandLine commandLine, PackagedProgram program) throws Exception {