You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by tr...@apache.org on 2018/01/11 23:19:53 UTC

[4/8] flink git commit: [FLINK-8338] [flip6] Make CustomCommandLines non static in CliFrontend

http://git-wip-us.apache.org/repos/asf/flink/blob/aff43768/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
index a255453..8572345 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/CliFrontendYarnAddressConfigurationTest.java
@@ -33,8 +33,10 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.test.util.TestBaseUtils;
 import org.apache.flink.util.TestLogger;
 import org.apache.flink.yarn.cli.FlinkYarnSessionCli;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
 
 import org.apache.commons.cli.CommandLine;
+import org.apache.commons.cli.Options;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationReport;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
@@ -74,6 +76,8 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 	private static final PrintStream OUT = System.out;
 	private static final PrintStream ERR = System.err;
 
+	private static FlinkYarnSessionCli cli;
+
 	@BeforeClass
 	public static void disableStdOutErr() {
 		class NullPrint extends OutputStream {
@@ -89,6 +93,8 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 		Map<String, String> map = new HashMap<>(System.getenv());
 		map.remove(ConfigConstants.ENV_FLINK_CONF_DIR);
 		TestBaseUtils.setEnv(map);
+
+		cli = new FlinkYarnSessionCli("y", "yarn");
 	}
 
 	@AfterClass
@@ -122,12 +128,21 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 
 		File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
 
+		final Configuration configuration = new Configuration();
+		configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
+
 		// start CLI Frontend
-		TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
+		TestCLI frontend = new CustomYarnTestCLI(configuration, directoryPath.getAbsolutePath());
 
-		RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
+		Options options = CliFrontendParser.getRunCommandOptions();
+		cli.addGeneralOptions(options);
+		cli.addRunOptions(options);
 
-		frontend.retrieveClient(options);
+		final CommandLine commandLine = CliFrontendParser.parse(options, new String[] {}, true);
+
+		RunOptions runOptions = new RunOptions(commandLine);
+
+		frontend.retrieveClient(runOptions);
 		checkJobManagerAddress(
 			frontend.getConfiguration(),
 			TEST_YARN_JOB_MANAGER_ADDRESS,
@@ -140,8 +155,11 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 
 		File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
 
+		final Configuration configuration = new Configuration();
+		configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
+
 		// start CLI Frontend
-		TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
+		TestCLI frontend = new CustomYarnTestCLI(configuration, directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
 
 		RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
 
@@ -157,7 +175,10 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 
 		File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile);
 
-		TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
+		final Configuration configuration = new Configuration();
+		configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
+
+		TestCLI frontend = new CustomYarnTestCLI(configuration, directoryPath.getAbsolutePath());
 
 		RunOptions options = CliFrontendParser.parseRunCommand(new String[] {});
 
@@ -174,8 +195,11 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 	public void testResumeFromYarnID() throws Exception {
 		File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
 
+		final Configuration configuration = new Configuration();
+		configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
+
 		// start CLI Frontend
-		TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
+		TestCLI frontend = new CustomYarnTestCLI(configuration, directoryPath.getAbsolutePath());
 
 		RunOptions options =
 			CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
@@ -191,13 +215,19 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 	@Test
 	public void testResumeFromYarnIDZookeeperNamespace() throws Exception {
 		File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
+
 		// start CLI Frontend
-		TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
+		TestCLI frontend = new CustomYarnTestCLI(new Configuration(), directoryPath.getAbsolutePath());
 
-		RunOptions options =
-				CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
+		Options options = CliFrontendParser.getRunCommandOptions();
+		cli.addGeneralOptions(options);
+		cli.addRunOptions(options);
 
-		frontend.retrieveClient(options);
+		final CommandLine commandLine = CliFrontendParser.parse(options, new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true);
+
+		RunOptions runOptions = new RunOptions(commandLine);
+
+		frontend.retrieveClient(runOptions);
 		String zkNs = frontend.getConfiguration().getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
 		Assert.assertTrue(zkNs.matches("application_\\d+_0042"));
 	}
@@ -205,13 +235,23 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 	@Test
 	public void testResumeFromYarnIDZookeeperNamespaceOverride() throws Exception {
 		File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
+
+		final Configuration configuration = new Configuration();
+		configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
+
 		// start CLI Frontend
-		TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
+		TestCLI frontend = new CustomYarnTestCLI(configuration, directoryPath.getAbsolutePath());
 		String overrideZkNamespace = "my_cluster";
-		RunOptions options =
-				CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString(), "-yz", overrideZkNamespace});
 
-		frontend.retrieveClient(options);
+		Options options = CliFrontendParser.getRunCommandOptions();
+		cli.addGeneralOptions(options);
+		cli.addRunOptions(options);
+
+		final CommandLine commandLine = CliFrontendParser.parse(options, new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString(), "-yz", overrideZkNamespace}, true);
+
+		RunOptions runOptions = new RunOptions(commandLine);
+
+		frontend.retrieveClient(runOptions);
 		String zkNs = frontend.getConfiguration().getValue(HighAvailabilityOptions.HA_CLUSTER_ID);
 		Assert.assertEquals(overrideZkNamespace, zkNs);
 	}
@@ -220,8 +260,11 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 	public void testResumeFromInvalidYarnID() throws Exception {
 		File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
 
+		final Configuration configuration = new Configuration();
+		configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
+
 		// start CLI Frontend
-		TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
+		TestCLI frontend = new CustomYarnTestCLI(configuration, directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
 
 		RunOptions options =
 			CliFrontendParser.parseRunCommand(new String[] {"-yid", ApplicationId.newInstance(0, 666).toString()});
@@ -237,8 +280,11 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 	public void testResumeFromYarnIDWithFinishedApplication() throws Exception {
 		File directoryPath = writeYarnPropertiesFile(validPropertiesFile);
 
+		final Configuration configuration = new Configuration();
+		configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
+
 		// start CLI Frontend
-		TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
+		TestCLI frontend = new CustomYarnTestCLI(configuration, directoryPath.getAbsolutePath(), FinalApplicationStatus.SUCCEEDED);
 
 		RunOptions options =
 			CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
@@ -255,13 +301,21 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 	public void testYarnIDOverridesPropertiesFile() throws Exception {
 		File directoryPath = writeYarnPropertiesFile(invalidPropertiesFile);
 
+		final Configuration configuration = new Configuration();
+		configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, directoryPath.getAbsolutePath());
+
 		// start CLI Frontend
-		TestCLI frontend = new CustomYarnTestCLI(directoryPath.getAbsolutePath());
+		TestCLI frontend = new CustomYarnTestCLI(configuration, directoryPath.getAbsolutePath());
 
-		RunOptions options =
-			CliFrontendParser.parseRunCommand(new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()});
+		Options options = CliFrontendParser.getRunCommandOptions();
+		cli.addGeneralOptions(options);
+		cli.addRunOptions(options);
 
-		frontend.retrieveClient(options);
+		final CommandLine commandLine = CliFrontendParser.parse(options, new String[] {"-yid", TEST_YARN_APPLICATION_ID.toString()}, true);
+
+		RunOptions runOptions = new RunOptions(commandLine);
+
+		frontend.retrieveClient(runOptions);
 
 		checkJobManagerAddress(
 			frontend.getConfiguration(),
@@ -276,7 +330,10 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 		File testConfFile = new File(emptyFolder.getAbsolutePath(), "flink-conf.yaml");
 		Files.createFile(testConfFile.toPath());
 
-		TestCLI frontend = new TestCLI(emptyFolder.getAbsolutePath());
+		final Configuration configuration = new Configuration();
+		configuration.setString(YarnConfigOptions.PROPERTIES_FILE_LOCATION, testConfFile.getAbsolutePath());
+
+		TestCLI frontend = new TestCLI(configuration, emptyFolder.getAbsolutePath());
 
 		RunOptions options = CliFrontendParser.parseRunCommand(new String[] {"-m", "10.221.130.22:7788"});
 
@@ -311,8 +368,11 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 	}
 
 	private static class TestCLI extends CliFrontend {
-		TestCLI(String configDir) throws Exception {
-			super(configDir);
+		TestCLI(Configuration configuration, String configDir) throws Exception {
+			super(
+				configuration,
+				CliFrontend.loadCustomCommandLines(),
+				configDir);
 		}
 
 		@Override
@@ -336,12 +396,15 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 		// the default application status for yarn applications to be retrieved
 		private final FinalApplicationStatus finalApplicationStatus;
 
-		CustomYarnTestCLI(String configDir) throws Exception {
-			this(configDir, FinalApplicationStatus.UNDEFINED);
+		CustomYarnTestCLI(Configuration configuration, String configDir) throws Exception {
+			this(configuration, configDir, FinalApplicationStatus.UNDEFINED);
 		}
 
-		CustomYarnTestCLI(String configDir, FinalApplicationStatus finalApplicationStatus) throws Exception {
-			super(configDir);
+		CustomYarnTestCLI(
+				Configuration configuration,
+				String configDir,
+				FinalApplicationStatus finalApplicationStatus) throws Exception {
+			super(configuration, configDir);
 			this.finalApplicationStatus = finalApplicationStatus;
 		}
 
@@ -369,7 +432,7 @@ public class CliFrontendYarnAddressConfigurationTest extends TestLogger {
 			}
 
 			/**
-			 * Replace the YarnClient for this test.
+			 * Replace the YarnClusterClient for this test.
 			 */
 			private class TestingYarnClusterDescriptor extends YarnClusterDescriptor {
 

http://git-wip-us.apache.org/repos/asf/flink/blob/aff43768/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
index 8541c40..627a6a4 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/FlinkYarnSessionCliTest.java
@@ -20,7 +20,6 @@ package org.apache.flink.yarn;
 
 import org.apache.flink.client.cli.CliFrontend;
 import org.apache.flink.client.cli.CliFrontendParser;
-import org.apache.flink.client.cli.RunOptions;
 import org.apache.flink.client.deployment.ClusterSpecification;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.configuration.Configuration;
@@ -89,11 +88,15 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 		String[] params =
 			new String[] {"-yn", "2", "-ys", "3", "-p", "7", jarFile.getAbsolutePath()};
 
-		RunOptions runOptions = CliFrontendParser.parseRunCommand(params);
-
 		FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");
 
-		ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(new Configuration(), runOptions.getCommandLine());
+		final Options options = CliFrontendParser.getRunCommandOptions();
+		yarnCLI.addRunOptions(options);
+		yarnCLI.addGeneralOptions(options);
+
+		CommandLine commandLine = CliFrontendParser.parse(options, params, true);
+
+		ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(new Configuration(), commandLine);
 
 		// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
 		Assert.assertEquals(4, clusterSpecification.getSlotsPerTaskManager());
@@ -102,66 +105,67 @@ public class FlinkYarnSessionCliTest extends TestLogger {
 
 	@Test
 	public void testCorrectSettingOfMaxSlots() throws Exception {
-
-		File confFile = tmp.newFile("flink-conf.yaml");
 		File jarFile = tmp.newFile("test.jar");
-		CliFrontend cliFrontend = new CliFrontend(tmp.getRoot().getAbsolutePath());
-
-		final Configuration config = cliFrontend.getConfiguration();
 
 		String[] params =
 			new String[] {"-yn", "2", "-ys", "3", jarFile.getAbsolutePath()};
 
-		RunOptions runOptions = CliFrontendParser.parseRunCommand(params);
-
 		FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");
 
+		final Options options = CliFrontendParser.getRunCommandOptions();
+		yarnCLI.addRunOptions(options);
+		yarnCLI.addGeneralOptions(options);
+
+		CommandLine commandLine = CliFrontendParser.parse(options, params, true);
+
 		final Configuration configuration = new Configuration();
 
 		AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor(
 			configuration,
 			tmp.getRoot().getAbsolutePath(),
 			"",
-			runOptions.getCommandLine());
+			commandLine);
 
 		final ClusterSpecification clusterSpecification = yarnCLI.createClusterSpecification(
 			configuration,
-			runOptions.getCommandLine());
+			commandLine);
 
 		// each task manager has 3 slots but the parallelism is 7. Thus the slots should be increased.
 		Assert.assertEquals(3, clusterSpecification.getSlotsPerTaskManager());
 		Assert.assertEquals(2, clusterSpecification.getNumberTaskManagers());
 
-		CliFrontend.setJobManagerAddressInConfig(config, new InetSocketAddress("localhost", 9000));
+		CliFrontend.setJobManagerAddressInConfig(configuration, new InetSocketAddress("localhost", 9000));
 		ClusterClient client = new TestingYarnClusterClient(
 			descriptor,
 			clusterSpecification.getNumberTaskManagers(),
 			clusterSpecification.getSlotsPerTaskManager(),
-			config);
+			configuration);
 		Assert.assertEquals(6, client.getMaxSlots());
 	}
 
 	@Test
 	public void testZookeeperNamespaceProperty() throws Exception {
 
-		File confFile = tmp.newFile("flink-conf.yaml");
 		File jarFile = tmp.newFile("test.jar");
-		CliFrontend cliFrontend = new CliFrontend(tmp.getRoot().getAbsolutePath());
-		final Configuration configuration = cliFrontend.getConfiguration();
 
 		String zkNamespaceCliInput = "flink_test_namespace";
 
 		String[] params =
 				new String[] {"-yn", "2", "-yz", zkNamespaceCliInput, jarFile.getAbsolutePath()};
 
-		RunOptions runOptions = CliFrontendParser.parseRunCommand(params);
-
 		FlinkYarnSessionCli yarnCLI = new TestCLI("y", "yarn");
+
+		final Options options = CliFrontendParser.getRunCommandOptions();
+		yarnCLI.addRunOptions(options);
+		yarnCLI.addGeneralOptions(options);
+
+		CommandLine commandLine = CliFrontendParser.parse(options, params, true);
+
 		AbstractYarnClusterDescriptor descriptor = yarnCLI.createDescriptor(
-			configuration,
+			new Configuration(),
 			tmp.getRoot().getAbsolutePath(),
 			"",
-			runOptions.getCommandLine());
+			commandLine);
 
 		Assert.assertEquals(zkNamespaceCliInput, descriptor.getZookeeperNamespace());
 	}

http://git-wip-us.apache.org/repos/asf/flink/blob/aff43768/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
----------------------------------------------------------------------
diff --git a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
index d9872a0..f242752 100644
--- a/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
+++ b/flink-yarn-tests/src/test/java/org/apache/flink/yarn/YarnTestBase.java
@@ -23,6 +23,7 @@ import org.apache.flink.client.cli.CustomCommandLine;
 import org.apache.flink.client.program.ClusterClient;
 import org.apache.flink.client.program.PackagedProgram;
 import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.GlobalConfiguration;
 import org.apache.flink.configuration.SecurityOptions;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.test.util.TestBaseUtils;
@@ -808,7 +809,12 @@ public abstract class YarnTestBase extends TestLogger {
 		private ClusterClient originalClusterClient;
 		private ClusterClient spiedClusterClient;
 
-		public TestingCLI() throws Exception {}
+		public TestingCLI() throws Exception {
+			super(
+				GlobalConfiguration.loadConfiguration(CliFrontend.getConfigurationDirectoryFromEnv()),
+				CliFrontend.loadCustomCommandLines(),
+				CliFrontend.getConfigurationDirectoryFromEnv());
+		}
 
 		@Override
 		protected ClusterClient createClient(CustomCommandLine<?> customCommandLine, CommandLine commandLine, PackagedProgram program) throws Exception {