You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@submarine.apache.org by pi...@apache.org on 2021/10/15 08:49:59 UTC
[submarine] branch master updated: SUBMARINE-1051. remove hadoop
dependency and profile
This is an automated email from the ASF dual-hosted git repository.
pingsutw pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/submarine.git
The following commit(s) were added to refs/heads/master by this push:
new 9d54a09 SUBMARINE-1051. remove hadoop dependency and profile
9d54a09 is described below
commit 9d54a09f5cece9c54a8c6df3b9245a31907e1e0b
Author: Brandon Lin <fa...@gmail.com>
AuthorDate: Mon Oct 11 16:49:56 2021 +0800
SUBMARINE-1051. remove hadoop dependency and profile
### What is this PR for?
Just like I mentioned in ticket, I created this PR to remove hadoop dependency and profile from maven.
### What type of PR is it?
Improvement
### Todos
* Since profile has already been removed, so we should check the maven command in all documents to remove profile argument.
### What is the Jira issue?
https://issues.apache.org/jira/browse/SUBMARINE-1051
### How should this be tested?
this PR had been passed the test in submarine-test:test-k8s and submarine-test:e2e
### Screenshots (if appropriate)
(the test has passed but it was still failed due to lacking file)
<img width="1678" alt="截圖 2021-10-11 上午10 45 24" src="https://user-images.githubusercontent.com/5687317/136763304-ecca0207-4f8d-4dda-a3af-7cdadb1d9048.png">
<img width="1679" alt="截圖 2021-10-11 下午5 03 24" src="https://user-images.githubusercontent.com/5687317/136763367-27e35570-623b-41f3-84cd-8322bd3cf3f6.png">
### Questions:
* Do the license files need updating? No
* Are there breaking changes for older versions? No
* Does this need new documentation? No
Author: Brandon Lin <fa...@gmail.com>
Signed-off-by: Kevin <pi...@apache.org>
Closes #773 from FatalLin/SUBMARINE-1051 and squashes the following commits:
e824eb21 [Brandon Lin] remove hadoop dependency and profile
---
pom.xml | 101 +---
submarine-client/pom.xml | 61 ---
.../apache/submarine/client/cli/AbstractCli.java | 52 --
.../org/apache/submarine/client/cli/CliUtils.java | 158 -------
.../apache/submarine/client/cli/ShowJobCli.java | 131 ------
.../submarine/client/cli/param/Localization.java | 137 ------
.../client/cli/param/ParametersHolder.java | 524 ---------------------
.../submarine/client/cli/param/Quicklink.java | 96 ----
.../submarine/client/cli/param/RunParameters.java | 111 -----
.../client/cli/param/ShowJobParameters.java | 25 -
.../cli/param/runjob/MXNetRunJobParameters.java | 236 ----------
.../cli/param/runjob/PyTorchRunJobParameters.java | 136 ------
.../client/cli/param/runjob/RunJobParameters.java | 361 --------------
.../param/runjob/TensorFlowRunJobParameters.java | 261 ----------
.../client/cli/runjob/RoleParameters.java | 88 ----
.../submarine/client/cli/runjob/RunJobCli.java | 371 ---------------
.../client/cli/ShowJobCliParsingTest.java | 104 ----
.../submarine/client/cli/YamlConfigTestUtils.java | 69 ---
.../cli/runjob/RunJobCliParsingCommonTest.java | 132 ------
.../cli/runjob/RunJobCliParsingCommonYamlTest.java | 258 ----------
.../runjob/RunJobCliParsingParameterizedTest.java | 196 --------
.../runjob/mxnet/RunJobCliParsingMXNetTest.java | 175 -------
.../mxnet/RunJobCliParsingMXNetYamlTest.java | 271 -----------
.../pytorch/RunJobCliParsingPyTorchTest.java | 282 -----------
.../pytorch/RunJobCliParsingPyTorchYamlTest.java | 271 -----------
.../tensorflow/RunJobCliParsingTensorFlowTest.java | 239 ----------
...nJobCliParsingTensorFlowYamlStandaloneTest.java | 206 --------
.../RunJobCliParsingTensorFlowYamlTest.java | 308 ------------
submarine-commons/commons-runtime/pom.xml | 115 -----
.../submarine/commons/runtime/ClientContext.java | 84 ----
.../submarine/commons/runtime/JobMonitor.java | 94 ----
.../submarine/commons/runtime/JobSubmitter.java | 42 --
.../submarine/commons/runtime/RuntimeFactory.java | 104 ----
.../runtime/fs/DefaultRemoteDirectoryManager.java | 168 -------
.../runtime/fs/FSBasedSubmarineStorageImpl.java | 105 -----
.../commons/runtime/fs/RemoteDirectoryManager.java | 53 ---
.../commons/runtime/param/BaseParameters.java | 71 ---
.../submarine/commons/runtime/param/Parameter.java | 45 --
.../commons/runtime/resource/ResourceUtils.java | 360 --------------
.../commons/runtime/MockClientContext.java | 44 --
.../runtime/fs/MockRemoteDirectoryManager.java | 212 ---------
submarine-dist/pom.xml | 12 +-
submarine-test/test-e2e/pom.xml | 2 +-
43 files changed, 7 insertions(+), 6864 deletions(-)
diff --git a/pom.xml b/pom.xml
index 3727f5a..3538d01 100644
--- a/pom.xml
+++ b/pom.xml
@@ -92,7 +92,6 @@
<npm.version>6.14.11</npm.version>
<io.swagger.version>2.1.2</io.swagger.version>
- <hadoop.common.build.dir>${basedir}/../hadoop-common-project/hadoop-common/target</hadoop.common.build.dir>
<slf4j.version>1.7.25</slf4j.version>
<log4j.version>1.2.17</log4j.version>
<commons.logging.version>1.1.3</commons.logging.version>
@@ -137,6 +136,10 @@
<plugin.failsafe.version>2.17</plugin.failsafe.version>
<!-- embedded-ldap-junit -->
<embedded-ldap-junit.version>0.7</embedded-ldap-junit.version>
+ <jaxb-api.version>2.2.11</jaxb-api.version>
+ <commons-compress.version>1.4.1</commons-compress.version>
+ <guice-servlet.version>3.0</guice-servlet.version>
+ <guice.version>3.0</guice.version>
</properties>
<modules>
@@ -192,37 +195,6 @@
<version>${k8s.client-java.version}</version>
</dependency>
- <!-- Submarine on YARN -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-services-api</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-common</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-client</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-hdfs</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
@@ -283,11 +255,7 @@
<artifactId>commons-lang</artifactId>
<version>${commons-lang.version}</version>
</dependency>
- <dependency>
- <groupId>org.apache.zookeeper</groupId>
- <artifactId>zookeeper</artifactId>
- <version>${zookeeper.version}</version>
- </dependency>
+
<dependency>
<groupId>commons-httpclient</groupId>
<artifactId>commons-httpclient</artifactId>
@@ -342,65 +310,6 @@
</dependencyManagement>
<profiles>
- <profile>
- <id>hadoop-3.2</id>
- <properties>
- <hadoop.version>3.2.1</hadoop.version>
- <jaxb-api.version>2.2.11</jaxb-api.version>
- <commons-compress.version>1.19</commons-compress.version>
- <guice-servlet.version>4.0</guice-servlet.version>
- <guice.version>4.0</guice.version>
- <zookeeper.version>3.4.13</zookeeper.version>
- <guava.version>30.0-jre</guava.version>
- <jsr305.version>3.0.2</jsr305.version>
- <profile-id>hadoop-3.2</profile-id>
- </properties>
- </profile>
-
- <profile>
- <id>hadoop-3.1</id>
- <properties>
- <hadoop.version>3.1.3</hadoop.version>
- <jaxb-api.version>2.2.11</jaxb-api.version>
- <commons-compress.version>1.19</commons-compress.version>
- <guice-servlet.version>4.0</guice-servlet.version>
- <guice.version>4.0</guice.version>
- <zookeeper.version>3.4.13</zookeeper.version>
- <profile-id>hadoop-3.1</profile-id>
- </properties>
- </profile>
-
- <profile>
- <id>hadoop-2.10</id>
- <activation>
- <activeByDefault>false</activeByDefault>
- </activation>
- <properties>
- <hadoop.version>2.10.0</hadoop.version>
- <jaxb-api.version>2.2.2</jaxb-api.version>
- <commons-compress.version>1.19</commons-compress.version>
- <guice-servlet.version>3.0</guice-servlet.version>
- <guice.version>3.0</guice.version>
- <zookeeper.version>3.4.9</zookeeper.version>
- <profile-id>hadoop-2.10</profile-id>
- </properties>
- </profile>
-
- <profile>
- <id>hadoop-2.9</id>
- <activation>
- <activeByDefault>true</activeByDefault>
- </activation>
- <properties>
- <hadoop.version>2.9.2</hadoop.version>
- <jaxb-api.version>2.2.11</jaxb-api.version>
- <commons-compress.version>1.4.1</commons-compress.version>
- <guice-servlet.version>3.0</guice-servlet.version>
- <guice.version>3.0</guice.version>
- <zookeeper.version>3.4.6</zookeeper.version>
- <profile-id>hadoop-2.9</profile-id>
- </properties>
- </profile>
<profile>
<id>clover</id>
diff --git a/submarine-client/pom.xml b/submarine-client/pom.xml
index 8f74fe6..e6b119d 100644
--- a/submarine-client/pom.xml
+++ b/submarine-client/pom.xml
@@ -64,67 +64,6 @@
<scope>test</scope>
</dependency>
- <!-- Dependencies for Hadoop -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.protobuf</groupId>
- <artifactId>protobuf-java</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
</dependencies>
</project>
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/AbstractCli.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/AbstractCli.java
deleted file mode 100644
index 177bf16..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/AbstractCli.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.util.Tool;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.commons.runtime.ClientContext;
-import org.apache.submarine.commons.runtime.exception.SubmarineException;
-
-import java.io.IOException;
-
-public abstract class AbstractCli implements Tool {
- protected ClientContext clientContext;
-
- public AbstractCli(ClientContext cliContext) {
- this.clientContext = cliContext;
- }
-
- @Override
- public abstract int run(String[] args)
- throws ParseException, IOException, YarnException, InterruptedException,
- SubmarineException;
-
- @Override
- public void setConf(Configuration conf) {
- clientContext.setYarnConfig(conf);
- }
-
- @Override
- public Configuration getConf() {
- return clientContext.getYarnConfig();
- }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/CliUtils.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/CliUtils.java
deleted file mode 100644
index 222de1f..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/CliUtils.java
+++ /dev/null
@@ -1,158 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.commons.runtime.exception.SubmarineRuntimeException;
-import org.apache.submarine.commons.runtime.fs.RemoteDirectoryManager;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-public class CliUtils {
- private static final Logger LOG =
- LoggerFactory.getLogger(CliUtils.class);
-
- /**
- * Replace patterns inside cli
- *
- * @return launch command after pattern replace
- */
- public static String replacePatternsInLaunchCommand(String specifiedCli,
- RunJobParameters jobRunParameters,
- RemoteDirectoryManager directoryManager)
- throws IOException {
- String input = jobRunParameters.getInputPath();
- String jobDir = jobRunParameters.getCheckpointPath();
- String savedModelDir = jobRunParameters.getSavedModelPath();
-
- Map<String, String> replacePattern = new HashMap<>();
- if (jobDir != null) {
- replacePattern.put("%" + CliConstants.CHECKPOINT_PATH + "%", jobDir);
- }
- if (input != null) {
- replacePattern.put("%" + CliConstants.INPUT_PATH + "%", input);
- }
- if (savedModelDir != null) {
- replacePattern.put("%" + CliConstants.SAVED_MODEL_PATH + "%",
- savedModelDir);
- }
-
- String newCli = specifiedCli;
- for (Map.Entry<String, String> replace : replacePattern.entrySet()) {
- newCli = newCli.replace(replace.getKey(), replace.getValue());
- }
-
- return newCli;
- }
-
- // Is it for help?
- public static boolean argsForHelp(String[] args) {
- if (args == null || args.length == 0)
- return true;
-
- if (args.length == 1) {
- return args[0].equals("-h") || args[0].equals("--help");
- }
-
- return false;
- }
-
- public static void doLoginIfSecure(String keytab, String principal) throws
- IOException {
- if (!UserGroupInformation.isSecurityEnabled()) {
- return;
- }
-
- if (StringUtils.isEmpty(keytab) || StringUtils.isEmpty(principal)) {
- if (StringUtils.isNotEmpty(keytab)) {
- SubmarineRuntimeException e = new SubmarineRuntimeException("The " +
- "parameter of " + CliConstants.PRINCIPAL + " is missing.");
- LOG.error(e.getMessage(), e);
- throw e;
- }
-
- if (StringUtils.isNotEmpty(principal)) {
- SubmarineRuntimeException e = new SubmarineRuntimeException("The " +
- "parameter of " + CliConstants.KEYTAB + " is missing.");
- LOG.error(e.getMessage(), e);
- throw e;
- }
-
- UserGroupInformation user = UserGroupInformation.getCurrentUser();
- if (user == null || user.getAuthenticationMethod() ==
- UserGroupInformation.AuthenticationMethod.SIMPLE) {
- SubmarineRuntimeException e = new SubmarineRuntimeException("Failed " +
- "to authenticate in secure environment. Please run kinit " +
- "command in advance or use " + "--" + CliConstants.KEYTAB + "/--" + CliConstants.PRINCIPAL +
- " parameters");
- LOG.error(e.getMessage(), e);
- throw e;
- }
- LOG.info("Submarine job is submitted by user: " + user.getUserName());
- return;
- }
-
- File keytabFile = new File(keytab);
- if (!keytabFile.exists()) {
- SubmarineRuntimeException e = new SubmarineRuntimeException("No " +
- "keytab localized at " + keytab);
- LOG.error(e.getMessage(), e);
- throw e;
- }
- UserGroupInformation.loginUserFromKeytab(principal, keytab);
- }
-
- /**
- * As hadoop-2.7 doesn't have this method, we add this method in submarine.
- * @param appIdStr
- * @return
- */
- public static ApplicationId fromString(String appIdStr) {
- String APPLICATION_ID_PREFIX = "application_";
- if (!appIdStr.startsWith(APPLICATION_ID_PREFIX)) {
- throw new IllegalArgumentException("Invalid ApplicationId prefix: "
- + appIdStr + ". The valid ApplicationId should start with prefix "
- + "application");
- }
- try {
- int pos1 = APPLICATION_ID_PREFIX.length() - 1;
- int pos2 = appIdStr.indexOf('_', pos1 + 1);
- if (pos2 < 0) {
- throw new IllegalArgumentException("Invalid ApplicationId: "
- + appIdStr);
- }
- long rmId = Long.parseLong(appIdStr.substring(pos1 + 1, pos2));
- int appId = Integer.parseInt(appIdStr.substring(pos2 + 1));
- ApplicationId applicationId = ApplicationId.newInstance(rmId, appId);
- return applicationId;
- } catch (NumberFormatException n) {
- throw new IllegalArgumentException("Invalid ApplicationId: "
- + appIdStr, n);
- }
- }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/ShowJobCli.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/ShowJobCli.java
deleted file mode 100644
index 651a542..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/ShowJobCli.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.param.ParametersHolder;
-import org.apache.submarine.client.cli.param.ShowJobParameters;
-import org.apache.submarine.commons.runtime.ClientContext;
-import org.apache.submarine.commons.runtime.exception.SubmarineException;
-import org.apache.submarine.commons.runtime.fs.StorageKeyConstants;
-import org.apache.submarine.commons.runtime.fs.SubmarineStorage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Map;
-
-public class ShowJobCli extends AbstractCli {
- private static final Logger LOG = LoggerFactory.getLogger(ShowJobCli.class);
-
- private Options options;
- private ParametersHolder parametersHolder;
-
- public ShowJobCli(ClientContext cliContext) {
- super(cliContext);
- options = generateOptions();
- }
-
- public void printUsages() {
- new HelpFormatter().printHelp("job show", options);
- }
-
- private Options generateOptions() {
- Options options = new Options();
- options.addOption(CliConstants.NAME, true, "Name of the job");
- options.addOption("h", "help", false, "Print help");
- return options;
- }
-
- private void parseCommandLineAndGetShowJobParameters(String[] args)
- throws IOException, YarnException {
- // Do parsing
- GnuParser parser = new GnuParser();
- CommandLine cli;
- try {
- cli = parser.parse(options, args);
- parametersHolder = ParametersHolder
- .createWithCmdLine(cli, Command.SHOW_JOB);
- parametersHolder.updateParameters(clientContext);
- } catch (ParseException e) {
- printUsages();
- }
- }
-
- private void printIfNotNull(String keyForPrint, String keyInStorage,
- Map<String, String> jobInfo) {
- if (jobInfo.containsKey(keyInStorage)) {
- System.out.println("\t" + keyForPrint + ": " + jobInfo.get(keyInStorage));
- }
- }
-
- private void printJobInfo(Map<String, String> jobInfo) {
- System.out.println("Job Meta Info:");
- printIfNotNull("Application Id", StorageKeyConstants.APPLICATION_ID,
- jobInfo);
- printIfNotNull("Input Path", StorageKeyConstants.INPUT_PATH, jobInfo);
- printIfNotNull("Saved Model Path", StorageKeyConstants.SAVED_MODEL_PATH,
- jobInfo);
- printIfNotNull("Checkpoint Path", StorageKeyConstants.CHECKPOINT_PATH,
- jobInfo);
- printIfNotNull("Run Parameters", StorageKeyConstants.JOB_RUN_ARGS,
- jobInfo);
- }
-
- @VisibleForTesting
- protected void getAndPrintJobInfo() throws IOException {
- SubmarineStorage storage =
- clientContext.getRuntimeFactory().getSubmarineStorage();
-
- Map<String, String> jobInfo = null;
- try {
- jobInfo = storage.getJobInfoByName(getParameters().getName());
- } catch (IOException e) {
- LOG.error("Failed to retrieve job info", e);
- throw e;
- }
-
- printJobInfo(jobInfo);
- }
-
- @VisibleForTesting
- public ShowJobParameters getParameters() {
- return (ShowJobParameters) parametersHolder.getParameters();
- }
-
- @Override
- public int run(String[] args)
- throws ParseException, IOException, YarnException, InterruptedException,
- SubmarineException {
- if (CliUtils.argsForHelp(args)) {
- printUsages();
- return 0;
- }
- parseCommandLineAndGetShowJobParameters(args);
- getAndPrintJobInfo();
- return 0;
- }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Localization.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Localization.java
deleted file mode 100644
index 0377450..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Localization.java
+++ /dev/null
@@ -1,137 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.param;
-
-import org.apache.commons.cli.ParseException;
-
-import java.util.Arrays;
-import java.util.List;
-
-/**
- * Localization parameter.
- * */
-public class Localization {
-
- private String mountPermissionPattern = "(wr|rw)$";
- /**
- * Regex for directory/file path in container.
- * YARN only support absolute path for mount, but we can
- * support some relative path.
- * For relative path, we only allow ".", "./","./name".
- * relative path like "./a/b" is not allowed.
- * "." and "./" means original dir/file name in container working directory
- * "./name" means use same or new "name" in container working directory
- * A absolute path means same path in container filesystem
- */
- private String localPathPattern = "((^\\.$)|(^\\./$)|(^\\./[^/]+)|(^/.*))";
- private String remoteUri;
- private String localPath;
-
- // Read write by default
- private String mountPermission = "rw";
-
- private static final List<String> SUPPORTED_SCHEME = Arrays.asList(
- "hdfs", "oss", "s3a", "s3n", "wasb",
- "wasbs", "abfs", "abfss", "adl", "har",
- "ftp", "http", "https", "viewfs", "swebhdfs",
- "webhdfs", "swift");
-
- public void parse(String arg) throws ParseException {
- String[] tokens = arg.split(":");
- int minimum = "a:b".split(":").length;
- int minimumWithPermission = "a:b:rw".split(":").length;
- int minimumParts = minimum;
- int miniPartsWithRemoteScheme = "scheme://a:b".split(":").length;
- int maximumParts = "scheme://a:b:rw".split(":").length;
- // If remote uri starts with a remote scheme
- if (isSupportedScheme(tokens[0])) {
- minimumParts = miniPartsWithRemoteScheme;
- }
- if (tokens.length < minimumParts
- || tokens.length > maximumParts) {
- throw new ParseException("Invalid parameter,"
- + "should be \"remoteUri:localPath[:rw|:wr]\" "
- + "format for --localizations");
- }
-
- /**
- * RemoteUri starts with remote scheme.
- * Merge part 0 and 1 to build a hdfs path in token[0].
- * toke[1] will be localPath to ease following logic
- * */
- if (minimumParts == miniPartsWithRemoteScheme) {
- tokens[0] = tokens[0] + ":" + tokens[1];
- tokens[1] = tokens[2];
- if (tokens.length == maximumParts) {
- // Has permission part
- mountPermission = tokens[maximumParts - 1];
- }
- }
- // RemoteUri starts with linux file path
- if (minimumParts == minimum
- && tokens.length == minimumWithPermission) {
- // Has permission part
- mountPermission = tokens[minimumWithPermission - 1];
- }
- remoteUri = tokens[0];
- localPath = tokens[1];
- if (!localPath.matches(localPathPattern)) {
- throw new ParseException("Invalid local file path:"
- + localPath
- + ", it only support \".\", \"./\", \"./name\" and "
- + "absolute path.");
- }
- if (!mountPermission.matches(mountPermissionPattern)) {
- throw new ParseException("Invalid mount permission (ro is not "
- + "supported yet), " + mountPermission);
- }
- }
-
- public String getRemoteUri() {
- return remoteUri;
- }
-
- public Localization setRemoteUri(String rUti) {
- this.remoteUri = rUti;
- return this;
- }
-
- public String getLocalPath() {
- return localPath;
- }
-
- public Localization setLocalPath(String lPath) {
- this.localPath = lPath;
- return this;
- }
-
- public String getMountPermission() {
- return mountPermission;
- }
-
- public Localization setMountPermission(String mPermission) {
- this.mountPermission = mPermission;
- return this;
- }
-
- private boolean isSupportedScheme(String scheme) {
- return SUPPORTED_SCHEME.contains(scheme);
- }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java
deleted file mode 100644
index 556dfe6..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ParametersHolder.java
+++ /dev/null
@@ -1,524 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.param;
-
-import com.google.common.collect.ImmutableSet;
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.CliConstants;
-import org.apache.submarine.client.cli.Command;
-import org.apache.submarine.client.cli.param.runjob.MXNetRunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.PyTorchRunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
-import org.apache.submarine.client.cli.param.yaml.Configs;
-import org.apache.submarine.client.cli.param.yaml.Role;
-import org.apache.submarine.client.cli.param.yaml.Roles;
-import org.apache.submarine.client.cli.param.yaml.Scheduling;
-import org.apache.submarine.client.cli.param.yaml.Security;
-import org.apache.submarine.client.cli.param.yaml.TensorBoard;
-import org.apache.submarine.client.cli.param.yaml.YamlConfigFile;
-import org.apache.submarine.client.cli.param.yaml.YamlParseException;
-import org.apache.submarine.commons.runtime.param.BaseParameters;
-import org.apache.submarine.commons.runtime.Framework;
-import org.apache.submarine.commons.runtime.ClientContext;
-import org.apache.submarine.commons.runtime.param.Parameter;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.stream.Collectors;
-
-import static org.apache.submarine.client.cli.runjob.RunJobCli.YAML_PARSE_FAILED;
-
-/**
- * This class acts as a wrapper of {@code CommandLine} values along with
- * YAML configuration values.
- * YAML configuration is only stored if the -f <filename>
- * option is specified along the CLI arguments.
- * Using this wrapper class makes easy to deal with
- * any form of configuration source potentially added into Submarine,
- * in the future.
- * If both YAML and CLI value is found for a config, this is an error case.
- */
-public final class ParametersHolder implements Parameter {
- private static final Logger LOG =
- LoggerFactory.getLogger(ParametersHolder.class);
-
- public static final String SUPPORTED_FRAMEWORKS_MESSAGE =
- "TensorFlow, PyTorch, MXNet are the only supported frameworks for now!";
- public static final String SUPPORTED_COMMANDS_MESSAGE =
- "'Show job' and 'run job' are the only supported commands for now!";
-
-
-
- private CommandLine parsedCommandLine;
- private Map<String, String> yamlStringConfigs;
- private Map<String, List<String>> yamlListConfigs;
- private ConfigType configType;
- private Command command;
- private final Set onlyDefinedWithCliArgs = ImmutableSet.of(
- CliConstants.VERBOSE);
- private Framework framework;
- private BaseParameters parameters;
-
- private ParametersHolder(CommandLine parsedCommandLine,
- YamlConfigFile yamlConfig, ConfigType configType, Command command)
- throws ParseException, YarnException {
- this.parsedCommandLine = parsedCommandLine;
- this.yamlStringConfigs = initStringConfigValues(yamlConfig);
- this.yamlListConfigs = initListConfigValues(yamlConfig);
- this.configType = configType;
- this.command = command;
- this.framework = determineFrameworkType();
- this.ensureOnlyValidSectionsAreDefined(yamlConfig);
- this.parameters = createParameters();
- }
-
- private ParametersHolder(){
- super();
- }
-
- private BaseParameters createParameters() {
- if (command == Command.RUN_JOB) {
- if (framework == Framework.TENSORFLOW) {
- return new TensorFlowRunJobParameters();
- } else if (framework == Framework.PYTORCH) {
- return new PyTorchRunJobParameters();
- } else if (framework == Framework.MXNET) {
- return new MXNetRunJobParameters();
- } else {
- throw new UnsupportedOperationException(SUPPORTED_FRAMEWORKS_MESSAGE);
- }
- } else if (command == Command.SHOW_JOB) {
- return new ShowJobParameters();
- } else {
- throw new UnsupportedOperationException(SUPPORTED_COMMANDS_MESSAGE);
- }
- }
-
- private void ensureOnlyValidSectionsAreDefined(YamlConfigFile yamlConfig) {
- if (isCommandRunJob() && isFrameworkPyTorch() &&
- isPsSectionDefined(yamlConfig)) {
- throw new YamlParseException(
- "PS section should not be defined when PyTorch " +
- "is the selected framework!");
- }
-
- if (isCommandRunJob() && (isFrameworkPyTorch() || isFrameworkMXNet())
- && isTensorboardSectionDefined(yamlConfig)) {
- throw new YamlParseException(
- "TensorBoard section should not be defined when TensorFlow " +
- "is not the selected framework!");
- }
-
- if (isCommandRunJob() && !isFrameworkMXNet() &&
- isSchedulerSectionDefined(yamlConfig)) {
- throw new YamlParseException(
- "Scheduler section should not be defined when MXNet " +
- "is not the selected framework!");
- }
- }
-
- private boolean isCommandRunJob() {
- return command == Command.RUN_JOB;
- }
-
- private boolean isFrameworkPyTorch() {
- return framework == Framework.PYTORCH;
- }
-
- private boolean isFrameworkMXNet() {
- return framework == Framework.MXNET;
- }
-
- private boolean isPsSectionDefined(YamlConfigFile yamlConfig) {
- return yamlConfig != null &&
- yamlConfig.getRoles() != null &&
- yamlConfig.getRoles().getPs() != null;
- }
-
- private boolean isTensorboardSectionDefined(YamlConfigFile yamlConfig) {
- return yamlConfig != null &&
- yamlConfig.getTensorBoard() != null;
- }
-
- private boolean isSchedulerSectionDefined(YamlConfigFile yamlConfig) {
- return yamlConfig != null &&
- yamlConfig.getRoles() != null &&
- yamlConfig.getRoles().getScheduler() != null;
- }
-
- private Framework determineFrameworkType()
- throws ParseException, YarnException {
- if (!isCommandRunJob()) {
- return null;
- }
- String frameworkStr = getOptionValue(CliConstants.FRAMEWORK);
- if (frameworkStr == null) {
- LOG.info("Framework is not defined in config, falling back to " +
- "TensorFlow as a default.");
- return Framework.TENSORFLOW;
- }
- Framework framework = Framework.parseByValue(frameworkStr);
- if (framework == null) {
- if (getConfigType() == ConfigType.CLI) {
- throw new ParseException("Failed to parse Framework type! "
- + "Valid values are: " + Framework.getValues());
- } else {
- throw new YamlParseException(YAML_PARSE_FAILED +
- ", framework should is defined, but it has an invalid value! " +
- "Valid values are: " + Framework.getValues());
- }
- }
- return framework;
- }
-
- /**
- * Maps every value coming from the passed yamlConfig to {@code CliConstants}.
- * @param yamlConfig Parsed YAML config
- * @return A map of config values, keys are {@code CliConstants}
- * and values are Strings.
- */
- private Map<String, String> initStringConfigValues(
- YamlConfigFile yamlConfig) {
- if (yamlConfig == null) {
- return Collections.emptyMap();
- }
- Map<String, String> yamlConfigValues = Maps.newHashMap();
- Roles roles = yamlConfig.getRoles();
-
- initGenericConfigs(yamlConfig, yamlConfigValues);
- initPs(yamlConfigValues, roles.getPs());
- initWorker(yamlConfigValues, roles.getWorker());
- initScheduler(yamlConfigValues, roles.getScheduler());
- initScheduling(yamlConfigValues, yamlConfig.getScheduling());
- initSecurity(yamlConfigValues, yamlConfig.getSecurity());
- initTensorBoard(yamlConfigValues, yamlConfig.getTensorBoard());
-
- return yamlConfigValues;
- }
-
- private Map<String, List<String>> initListConfigValues(
- YamlConfigFile yamlConfig) {
- if (yamlConfig == null) {
- return Collections.emptyMap();
- }
-
- Map<String, List<String>> yamlConfigValues = Maps.newHashMap();
- Configs configs = yamlConfig.getConfigs();
- yamlConfigValues.put(CliConstants.LOCALIZATION, configs.getLocalizations());
- yamlConfigValues.put(CliConstants.ENV,
- convertToEnvsList(configs.getEnvs()));
- yamlConfigValues.put(CliConstants.QUICKLINK, configs.getQuicklinks());
-
- return yamlConfigValues;
- }
-
- private void initGenericConfigs(YamlConfigFile yamlConfig,
- Map<String, String> yamlConfigs) {
- yamlConfigs.put(CliConstants.NAME, yamlConfig.getSpec().getName());
- yamlConfigs.put(CliConstants.FRAMEWORK,
- yamlConfig.getSpec().getFramework());
-
- Configs configs = yamlConfig.getConfigs();
- yamlConfigs.put(CliConstants.INPUT_PATH, configs.getInputPath());
- yamlConfigs.put(CliConstants.CHECKPOINT_PATH, configs.getCheckpointPath());
- yamlConfigs.put(CliConstants.SAVED_MODEL_PATH, configs.getSavedModelPath());
- yamlConfigs.put(CliConstants.DOCKER_IMAGE, configs.getDockerImage());
- yamlConfigs.put(CliConstants.WAIT_JOB_FINISH, configs.getWaitJobFinish());
- }
-
- private void initPs(Map<String, String> yamlConfigs, Role ps) {
- if (ps == null) {
- return;
- }
- yamlConfigs.put(CliConstants.N_PS, String.valueOf(ps.getReplicas()));
- yamlConfigs.put(CliConstants.PS_RES, ps.getResources());
- yamlConfigs.put(CliConstants.PS_DOCKER_IMAGE, ps.getDockerImage());
- yamlConfigs.put(CliConstants.PS_LAUNCH_CMD, ps.getLaunchCmd());
- }
-
- private void initWorker(Map<String, String> yamlConfigs, Role worker) {
- if (worker == null) {
- return;
- }
- yamlConfigs.put(CliConstants.N_WORKERS,
- String.valueOf(worker.getReplicas()));
- yamlConfigs.put(CliConstants.WORKER_RES, worker.getResources());
- yamlConfigs.put(CliConstants.WORKER_DOCKER_IMAGE, worker.getDockerImage());
- yamlConfigs.put(CliConstants.WORKER_LAUNCH_CMD, worker.getLaunchCmd());
- }
-
- private void initScheduler(Map<String, String> yamlConfigs, Role scheduler) {
- if (scheduler == null) {
- return;
- }
- yamlConfigs.put(CliConstants.N_SCHEDULERS,
- String.valueOf(scheduler.getReplicas()));
- yamlConfigs.put(CliConstants.SCHEDULER_RES, scheduler.getResources());
- yamlConfigs.put(CliConstants.SCHEDULER_DOCKER_IMAGE, scheduler.getDockerImage());
- yamlConfigs.put(CliConstants.SCHEDULER_LAUNCH_CMD, scheduler.getLaunchCmd());
- }
-
- private void initScheduling(Map<String, String> yamlConfigValues,
- Scheduling scheduling) {
- if (scheduling == null) {
- return;
- }
- yamlConfigValues.put(CliConstants.QUEUE, scheduling.getQueue());
- }
-
- private void initSecurity(Map<String, String> yamlConfigValues,
- Security security) {
- if (security == null) {
- return;
- }
- yamlConfigValues.put(CliConstants.KEYTAB, security.getKeytab());
- yamlConfigValues.put(CliConstants.PRINCIPAL, security.getPrincipal());
- yamlConfigValues.put(CliConstants.DISTRIBUTE_KEYTAB,
- String.valueOf(security.isDistributeKeytab()));
- }
-
- private void initTensorBoard(Map<String, String> yamlConfigValues,
- TensorBoard tensorBoard) {
- if (tensorBoard == null) {
- return;
- }
- yamlConfigValues.put(CliConstants.TENSORBOARD, Boolean.TRUE.toString());
- yamlConfigValues.put(CliConstants.TENSORBOARD_DOCKER_IMAGE,
- tensorBoard.getDockerImage());
- yamlConfigValues.put(CliConstants.TENSORBOARD_RESOURCES,
- tensorBoard.getResources());
- }
-
- private List<String> convertToEnvsList(Map<String, String> envs) {
- if (envs == null) {
- return Collections.emptyList();
- }
- return envs.entrySet().stream()
- .map(e -> String.format("%s=%s", e.getKey(), e.getValue()))
- .collect(Collectors.toList());
- }
-
- public static ParametersHolder create() {
- return new ParametersHolder();
- }
-
- public static ParametersHolder createWithCmdLine(CommandLine cli,
- Command command) throws ParseException, YarnException {
- return new ParametersHolder(cli, null, ConfigType.CLI, command);
- }
-
- public static ParametersHolder createWithCmdLineAndYaml(CommandLine cli,
- YamlConfigFile yamlConfig, Command command) throws ParseException,
- YarnException {
- return new ParametersHolder(cli, yamlConfig, ConfigType.YAML, command);
- }
-
- /**
- * Gets the option value, either from the CLI arguments or YAML config,
- * if present.
- * @param option Name of the config.
- * @return The value of the config
- */
- public String getOptionValue(String option) throws YarnException {
- ensureConfigIsDefinedOnce(option, true);
- if (onlyDefinedWithCliArgs.contains(option) ||
- parsedCommandLine.hasOption(option)) {
- return getValueFromCLI(option);
- }
- return getValueFromYaml(option);
- }
-
- /**
- * Gets the option values, either from the CLI arguments or YAML config,
- * if present.
- * @param option Name of the config.
- * @return The values of the config
- */
- public List<String> getOptionValues(String option) throws YarnException {
- ensureConfigIsDefinedOnce(option, false);
- if (onlyDefinedWithCliArgs.contains(option) ||
- parsedCommandLine.hasOption(option)) {
- return getValuesFromCLI(option);
- }
- return getValuesFromYaml(option);
- }
-
- private void ensureConfigIsDefinedOnce(String option, boolean stringValue)
- throws YarnException {
- boolean definedWithYaml;
- if (stringValue) {
- definedWithYaml = yamlStringConfigs.containsKey(option);
- } else {
- definedWithYaml = yamlListConfigs.containsKey(option);
- }
-
- if (parsedCommandLine.hasOption(option) && definedWithYaml) {
- throw new YarnException("Config '%s' is defined both with YAML config" +
- " and with CLI argument, please only use either way!");
- }
- }
-
- private String getValueFromCLI(String option) {
- String value = parsedCommandLine.getOptionValue(option);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found config value {} for key {} " +
- "from CLI configuration.", value, option);
- }
- return value;
- }
-
- private List<String> getValuesFromCLI(String option) {
- String[] optionValues = parsedCommandLine.getOptionValues(option);
- if (optionValues != null) {
- List<String> values = Arrays.asList(optionValues);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found config values {} for key {} " +
- "from CLI configuration.", values, option);
- }
- return values;
- } else {
- if (LOG.isDebugEnabled()) {
- LOG.debug("No config values found for key {} " +
- "from CLI configuration.", option);
- }
- return Lists.newArrayList();
- }
- }
-
- private String getValueFromYaml(String option) {
- String value = yamlStringConfigs.get(option);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found config value {} for key {} " +
- "from YAML configuration.", value, option);
- }
- return value;
- }
-
- private List<String> getValuesFromYaml(String option) {
- List<String> values = yamlListConfigs.get(option);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found config values {} for key {} " +
- "from YAML configuration.", values, option);
- }
- return values;
- }
-
- /**
- * Returns the boolean value of option.
- * First, we check if the CLI value is defined for the option.
- * If not, then we check the YAML value.
- * @param option name of the option
- * @return true, if the option is found in the CLI args or in the YAML config,
- * false otherwise.
- */
- public boolean hasOption(String option) {
- if (onlyDefinedWithCliArgs.contains(option)) {
- boolean value = parsedCommandLine.hasOption(option);
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found boolean config with value {} for key {} " +
- "from CLI configuration.", value, option);
- }
- return value;
- }
- if (parsedCommandLine.hasOption(option)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Found boolean config value for key {} " +
- "from CLI configuration.", option);
- }
- return true;
- }
- return getBooleanValueFromYaml(option);
- }
-
- private boolean getBooleanValueFromYaml(String option) {
- String stringValue = yamlStringConfigs.get(option);
- boolean result = stringValue != null
- && Boolean.valueOf(stringValue).equals(Boolean.TRUE);
- LOG.debug("Found config value {} for key {} " +
- "from YAML configuration.", result, option);
- return result;
- }
-
- public ConfigType getConfigType() {
- return configType;
- }
-
- public Framework getFramework() {
- return framework;
- }
-
- @Override
- public Parameter setFramework(Framework framework) {
- this.framework = framework;
- return this;
- }
-
- public void updateParameters(ClientContext clientContext)
- throws ParseException, YarnException, IOException {
- parameters.updateParameters(this, clientContext);
- }
-
- public BaseParameters getParameters() {
- return parameters;
- }
-
- public Parameter setParameters(BaseParameters parameters) {
- this.parameters = parameters;
- return this;
- }
-
- public CommandLine getParsedCommandLine() {
- return parsedCommandLine;
- }
-
- public Parameter setParsedCommandLine(CommandLine parsedCommandLine) {
- this.parsedCommandLine = parsedCommandLine;
- return this;
- }
-
- public Map<String, String> getYamlStringConfigs() {
- return yamlStringConfigs;
- }
-
- public Parameter setYamlStringConfigs(Map<String, String> yamlStringConfigs) {
- this.yamlStringConfigs = yamlStringConfigs;
- return this;
- }
-
- public Map<String, List<String>> getYamlListConfigs() {
- return yamlListConfigs;
- }
-
- public Parameter setYamlListConfigs(
- Map<String, List<String>> yamlListConfigs) {
- this.yamlListConfigs = yamlListConfigs;
- return this;
- }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Quicklink.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Quicklink.java
deleted file mode 100644
index 4fe035c..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/Quicklink.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.param;
-
-import org.apache.commons.cli.ParseException;
-
-/**
- * A class represents quick links to a web page.
- */
-public class Quicklink {
- private String label;
- private String componentInstanceName;
- private String protocol;
- private int port;
-
- public void parse(String quicklinkStr) throws ParseException {
- if (!quicklinkStr.contains("=")) {
- throw new ParseException("Should be <label>=<link> format for quicklink");
- }
-
- int index = quicklinkStr.indexOf("=");
- label = quicklinkStr.substring(0, index);
- quicklinkStr = quicklinkStr.substring(index + 1);
-
- if (quicklinkStr.startsWith("http://")) {
- protocol = "http://";
- } else if (quicklinkStr.startsWith("https://")) {
- protocol = "https://";
- } else {
- throw new ParseException("Quicklink should start with http or https");
- }
-
- quicklinkStr = quicklinkStr.substring(protocol.length());
- index = quicklinkStr.indexOf(":");
-
- if (index == -1) {
- throw new ParseException("Quicklink should be componet-id:port form");
- }
-
- componentInstanceName = quicklinkStr.substring(0, index);
- port = Integer.parseInt(quicklinkStr.substring(index + 1));
- }
-
- public String getLabel() {
- return label;
- }
-
- public Quicklink setLabel(String label) {
- this.label = label;
- return this;
- }
-
- public String getComponentInstanceName() {
- return componentInstanceName;
- }
-
- public Quicklink setComponentInstanceName(String componentInstanceName) {
- this.componentInstanceName = componentInstanceName;
- return this;
- }
-
- public String getProtocol() {
- return protocol;
- }
-
- public Quicklink setProtocol(String protocol) {
- this.protocol = protocol;
- return this;
- }
-
- public int getPort() {
- return port;
- }
-
- public Quicklink setPort(int port) {
- this.port = port;
- return this;
- }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/RunParameters.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/RunParameters.java
deleted file mode 100644
index 2bfbb9e..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/RunParameters.java
+++ /dev/null
@@ -1,111 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.param;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.CliConstants;
-import org.apache.submarine.commons.runtime.param.BaseParameters;
-import org.apache.submarine.commons.runtime.ClientContext;
-import org.apache.submarine.commons.runtime.param.Parameter;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Parameters required to run anything on cluster. Such as run job / serve model
- */
-public abstract class RunParameters extends BaseParameters {
- private String savedModelPath;
- private String dockerImageName;
- private List<String> envars = new ArrayList<>();
- private String queue;
-
- @Override
- public void updateParameters(Parameter parametersHolder, ClientContext clientContext)
- throws ParseException,
- IOException, YarnException {
- String savedModelPath = parametersHolder.getOptionValue(
- CliConstants.SAVED_MODEL_PATH);
- this.setSavedModelPath(savedModelPath);
-
- List<String> envVars = getEnvVars((ParametersHolder)parametersHolder);
- this.setEnvars(envVars);
-
- String queue = parametersHolder.getOptionValue(
- CliConstants.QUEUE);
- this.setQueue(queue);
-
- String dockerImage = parametersHolder.getOptionValue(
- CliConstants.DOCKER_IMAGE);
- this.setDockerImageName(dockerImage);
-
- super.updateParameters(parametersHolder, clientContext);
- }
-
- private List<String> getEnvVars(ParametersHolder parametersHolder)
- throws YarnException {
- List<String> result = new ArrayList<>();
- List<String> envVarsArray = parametersHolder.getOptionValues(
- CliConstants.ENV);
- if (envVarsArray != null) {
- result.addAll(envVarsArray);
- }
- return result;
- }
-
- public String getQueue() {
- return queue;
- }
-
- public RunParameters setQueue(String queue) {
- this.queue = queue;
- return this;
- }
-
- public String getDockerImageName() {
- return dockerImageName;
- }
-
- public RunParameters setDockerImageName(String dockerImageName) {
- this.dockerImageName = dockerImageName;
- return this;
- }
-
-
- public List<String> getEnvars() {
- return envars;
- }
-
- public RunParameters setEnvars(List<String> envars) {
- this.envars = envars;
- return this;
- }
-
- public String getSavedModelPath() {
- return savedModelPath;
- }
-
- public RunParameters setSavedModelPath(String savedModelPath) {
- this.savedModelPath = savedModelPath;
- return this;
- }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ShowJobParameters.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ShowJobParameters.java
deleted file mode 100644
index e71b3d7..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/ShowJobParameters.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.param;
-
-import org.apache.submarine.commons.runtime.param.BaseParameters;
-
-public class ShowJobParameters extends BaseParameters {
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/MXNetRunJobParameters.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/MXNetRunJobParameters.java
deleted file mode 100644
index 4c4671b..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/MXNetRunJobParameters.java
+++ /dev/null
@@ -1,236 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.param.runjob;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.CliConstants;
-import org.apache.submarine.client.cli.CliUtils;
-import org.apache.submarine.client.cli.runjob.RoleParameters;
-import org.apache.submarine.commons.runtime.ClientContext;
-import org.apache.submarine.commons.runtime.api.MXNetRole;
-import org.apache.submarine.commons.runtime.param.Parameter;
-import org.apache.submarine.commons.runtime.resource.ResourceUtils;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Parameters for MXNet job.
- */
-public class MXNetRunJobParameters extends RunJobParameters {
- private RoleParameters psParameters =
- RoleParameters.createEmpty(MXNetRole.PS);
-
- private RoleParameters schedulerParameters =
- RoleParameters.createEmpty(MXNetRole.SCHEDULER);
-
- private static final String CANNOT_BE_DEFINED_FOR_MXNET =
- "cannot be defined for MXNet jobs!";
-
- @Override
- public void updateParameters(Parameter parametersHolder, ClientContext clientContext)
- throws ParseException, IOException, YarnException {
-
- checkArguments(parametersHolder);
- super.updateParameters(parametersHolder, clientContext);
-
- String input = parametersHolder.getOptionValue(CliConstants.INPUT_PATH);
- this.workerParameters = generateWorkerParameters(clientContext, parametersHolder, input);
- this.psParameters = getPSParameters(parametersHolder);
- this.schedulerParameters = getSchedulerParameters(parametersHolder);
- this.distributed = determineIfDistributed(workerParameters.getReplicas(),
- psParameters.getReplicas(), schedulerParameters.getReplicas());
-
- executePostOperations(clientContext);
- }
-
- @Override
- void executePostOperations(ClientContext clientContext) throws IOException {
- // Set default job dir / saved model dir, etc.
- setDefaultDirs(clientContext);
- replacePatternsInParameters(clientContext);
- }
-
- @Override
- public List<String> getLaunchCommands() {
- return Lists.newArrayList(getWorkerLaunchCmd(), getPSLaunchCmd(), getSchedulerLaunchCmd());
- }
-
- private void replacePatternsInParameters(ClientContext clientContext)
- throws IOException {
- if (StringUtils.isNotEmpty(getPSLaunchCmd())) {
- String afterReplace =
- CliUtils.replacePatternsInLaunchCommand(getPSLaunchCmd(), this,
- clientContext.getRemoteDirectoryManager());
- setPSLaunchCmd(afterReplace);
- }
- if (StringUtils.isNotEmpty(getWorkerLaunchCmd())) {
- String afterReplace =
- CliUtils.replacePatternsInLaunchCommand(getWorkerLaunchCmd(), this,
- clientContext.getRemoteDirectoryManager());
- setWorkerLaunchCmd(afterReplace);
- }
- if (StringUtils.isNotEmpty(getSchedulerLaunchCmd())) {
- String afterReplace =
- CliUtils.replacePatternsInLaunchCommand(getSchedulerLaunchCmd(), this,
- clientContext.getRemoteDirectoryManager());
- setSchedulerLaunchCmd(afterReplace);
- }
- }
-
- private void checkArguments(Parameter parametersHolder)
- throws YarnException, ParseException {
- if (parametersHolder.hasOption(CliConstants.TENSORBOARD)) {
- throw new ParseException(getParamCannotBeDefinedErrorMessage(
- CliConstants.TENSORBOARD));
- } else if (parametersHolder
- .getOptionValue(CliConstants.TENSORBOARD_RESOURCES) != null) {
- throw new ParseException(getParamCannotBeDefinedErrorMessage(
- CliConstants.TENSORBOARD_RESOURCES));
- } else if (parametersHolder
- .getOptionValue(CliConstants.TENSORBOARD_DOCKER_IMAGE) != null) {
- throw new ParseException(getParamCannotBeDefinedErrorMessage(
- CliConstants.TENSORBOARD_DOCKER_IMAGE));
- }
- }
-
- private boolean determineIfDistributed(int nWorkers, int nPS, int nSchedulers) {
- return nWorkers >= 2 && nPS > 0 && nSchedulers == 1;
- }
-
- private String getParamCannotBeDefinedErrorMessage(String cliName) {
- return String.format(
- "Parameter '%s' " + CANNOT_BE_DEFINED_FOR_MXNET, cliName);
- }
-
- private RoleParameters getPSParameters(Parameter parametersHolder)
- throws YarnException, ParseException {
- int nPS = getNumberOfPS(parametersHolder);
- Resource psResource =
- determinePSResource(parametersHolder, nPS);
- String psDockerImage =
- parametersHolder.getOptionValue(CliConstants.PS_DOCKER_IMAGE);
- String psLaunchCommand =
- parametersHolder.getOptionValue(CliConstants.PS_LAUNCH_CMD);
- return new RoleParameters(MXNetRole.PS, nPS, psLaunchCommand,
- psDockerImage, psResource);
- }
-
- private Resource determinePSResource(Parameter parametersHolder, int nPS)
- throws ParseException, YarnException {
- if (nPS > 0) {
- String psResourceStr =
- parametersHolder.getOptionValue(CliConstants.PS_RES);
- if (psResourceStr == null) {
- throw new ParseException("--" + CliConstants.PS_RES + " is absent.");
- }
- return ResourceUtils.createResourceFromString(psResourceStr);
- }
- return null;
- }
-
- public String getPSLaunchCmd() {
- return psParameters.getLaunchCommand();
- }
-
- public void setPSLaunchCmd(String launchCmd) {
- psParameters.setLaunchCommand(launchCmd);
- }
-
- private int getNumberOfPS(Parameter parametersHolder) throws YarnException {
- int nPS = 0;
- if (parametersHolder.getOptionValue(CliConstants.N_PS) != null) {
- nPS = Integer.parseInt(parametersHolder.getOptionValue(CliConstants.N_PS));
- }
- return nPS;
- }
-
- private RoleParameters getSchedulerParameters(Parameter parametersHolder)
- throws YarnException, ParseException {
- int nSchedulers = getNumberOfScheduler(parametersHolder);
- Resource schedulerResource =
- determineSchedulerResource(parametersHolder, nSchedulers);
- String schedulerDockerImage =
- parametersHolder.getOptionValue(CliConstants.SCHEDULER_DOCKER_IMAGE);
- String schedulerLaunchCommand =
- parametersHolder.getOptionValue(CliConstants.SCHEDULER_LAUNCH_CMD);
- return new RoleParameters(MXNetRole.SCHEDULER, nSchedulers, schedulerLaunchCommand,
- schedulerDockerImage, schedulerResource);
- }
-
- private Resource determineSchedulerResource(Parameter parametersHolder, int nSchedulers)
- throws ParseException, YarnException {
- if (nSchedulers > 0) {
- String schedulerResourceStr = parametersHolder.getOptionValue(CliConstants.SCHEDULER_RES);
- if (schedulerResourceStr == null) {
- throw new ParseException("--" + CliConstants.SCHEDULER_RES + " is absent.");
- }
- return ResourceUtils.createResourceFromString(schedulerResourceStr);
- }
- return null;
- }
-
- private int getNumberOfScheduler(Parameter parametersHolder) throws ParseException, YarnException {
- int nSchedulers = 0;
- if (parametersHolder.getOptionValue(CliConstants.N_SCHEDULERS) != null) {
- nSchedulers = Integer.parseInt(parametersHolder.getOptionValue(CliConstants.N_SCHEDULERS));
- if (nSchedulers > 1 || nSchedulers < 0) {
- throw new ParseException("--" + CliConstants.N_SCHEDULERS + " should be 1 or 0");
- }
- }
- return nSchedulers;
- }
-
- public String getSchedulerLaunchCmd() {
- return schedulerParameters.getLaunchCommand();
- }
-
- public void setSchedulerLaunchCmd(String launchCmd) {
- schedulerParameters.setLaunchCommand(launchCmd);
- }
-
- public int getNumPS() {
- return psParameters.getReplicas();
- }
-
- public Resource getPsResource() {
- return psParameters.getResource();
- }
-
- public String getPsDockerImage() {
- return psParameters.getDockerImage();
- }
-
- public int getNumSchedulers() {
- return schedulerParameters.getReplicas();
- }
-
- public Resource getSchedulerResource() {
- return schedulerParameters.getResource();
- }
-
- public String getSchedulerDockerImage() {
- return schedulerParameters.getDockerImage();
- }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/PyTorchRunJobParameters.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/PyTorchRunJobParameters.java
deleted file mode 100644
index ad0b64f..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/PyTorchRunJobParameters.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.param.runjob;
-
-import java.io.IOException;
-import java.util.List;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.CliConstants;
-import org.apache.submarine.client.cli.CliUtils;
-import org.apache.submarine.commons.runtime.ClientContext;
-
-import com.google.common.collect.Lists;
-import org.apache.submarine.commons.runtime.param.Parameter;
-
-/**
- * Parameters for PyTorch job.
- */
-public class PyTorchRunJobParameters extends RunJobParameters {
-
- private static final String CANNOT_BE_DEFINED_FOR_PYTORCH =
- "cannot be defined for PyTorch jobs!";
-
- @Override
- public void updateParameters(Parameter parametersHolder, ClientContext clientContext)
- throws ParseException, IOException, YarnException {
- checkArguments(parametersHolder);
-
- super.updateParameters(parametersHolder, clientContext);
-
- String input = parametersHolder.getOptionValue(CliConstants.INPUT_PATH);
- this.workerParameters =
- generateWorkerParameters(clientContext, parametersHolder, input);
- this.distributed = determineIfDistributed(workerParameters.getReplicas());
- executePostOperations(clientContext);
- }
-
- private void checkArguments(Parameter parametersHolder)
- throws YarnException, ParseException {
- if (parametersHolder.getOptionValue(CliConstants.N_PS) != null) {
- throw new ParseException(getParamCannotBeDefinedErrorMessage(
- CliConstants.N_PS));
- } else if (parametersHolder.getOptionValue(CliConstants.PS_RES) != null) {
- throw new ParseException(getParamCannotBeDefinedErrorMessage(
- CliConstants.PS_RES));
- } else if (parametersHolder
- .getOptionValue(CliConstants.PS_DOCKER_IMAGE) != null) {
- throw new ParseException(getParamCannotBeDefinedErrorMessage(
- CliConstants.PS_DOCKER_IMAGE));
- } else if (parametersHolder
- .getOptionValue(CliConstants.PS_LAUNCH_CMD) != null) {
- throw new ParseException(getParamCannotBeDefinedErrorMessage(
- CliConstants.PS_LAUNCH_CMD));
- } else if (parametersHolder.hasOption(CliConstants.TENSORBOARD)) {
- throw new ParseException(getParamCannotBeDefinedErrorMessage(
- CliConstants.TENSORBOARD));
- } else if (parametersHolder
- .getOptionValue(CliConstants.TENSORBOARD_RESOURCES) != null) {
- throw new ParseException(getParamCannotBeDefinedErrorMessage(
- CliConstants.TENSORBOARD_RESOURCES));
- } else if (parametersHolder
- .getOptionValue(CliConstants.TENSORBOARD_DOCKER_IMAGE) != null) {
- throw new ParseException(getParamCannotBeDefinedErrorMessage(
- CliConstants.TENSORBOARD_DOCKER_IMAGE));
- } else if (parametersHolder.getOptionValue(CliConstants.N_SCHEDULERS) != null) {
- throw new ParseException(getParamCannotBeDefinedErrorMessage(
- CliConstants.N_SCHEDULERS));
- } else if (parametersHolder.getOptionValue(CliConstants.SCHEDULER_RES) != null) {
- throw new ParseException(getParamCannotBeDefinedErrorMessage(
- CliConstants.SCHEDULER_RES));
- } else if (parametersHolder
- .getOptionValue(CliConstants.SCHEDULER_DOCKER_IMAGE) != null) {
- throw new ParseException(getParamCannotBeDefinedErrorMessage(
- CliConstants.SCHEDULER_DOCKER_IMAGE));
- } else if (parametersHolder
- .getOptionValue(CliConstants.SCHEDULER_LAUNCH_CMD) != null) {
- throw new ParseException(getParamCannotBeDefinedErrorMessage(
- CliConstants.SCHEDULER_LAUNCH_CMD));
- }
- }
-
- private String getParamCannotBeDefinedErrorMessage(String cliName) {
- return String.format(
- "Parameter '%s' " + CANNOT_BE_DEFINED_FOR_PYTORCH, cliName);
- }
-
- @Override
- void executePostOperations(ClientContext clientContext) throws IOException {
- // Set default job dir / saved model dir, etc.
- setDefaultDirs(clientContext);
- replacePatternsInParameters(clientContext);
- }
-
- private void replacePatternsInParameters(ClientContext clientContext)
- throws IOException {
- if (StringUtils.isNotEmpty(getWorkerLaunchCmd())) {
- String afterReplace =
- CliUtils.replacePatternsInLaunchCommand(getWorkerLaunchCmd(), this,
- clientContext.getRemoteDirectoryManager());
- setWorkerLaunchCmd(afterReplace);
- }
- }
-
- @Override
- public List<String> getLaunchCommands() {
- return Lists.newArrayList(getWorkerLaunchCmd());
- }
-
- /**
- * We only support non-distributed PyTorch integration for now.
- * @param nWorkers
- * @return
- */
- private boolean determineIfDistributed(int nWorkers) {
- return false;
- }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/RunJobParameters.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/RunJobParameters.java
deleted file mode 100644
index 64b01b1..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/RunJobParameters.java
+++ /dev/null
@@ -1,361 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.param.runjob;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.CaseFormat;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.CliConstants;
-import org.apache.submarine.client.cli.CliUtils;
-import org.apache.submarine.client.cli.param.Localization;
-import org.apache.submarine.client.cli.param.Quicklink;
-import org.apache.submarine.client.cli.param.RunParameters;
-import org.apache.submarine.client.cli.runjob.RoleParameters;
-import org.apache.submarine.commons.runtime.ClientContext;
-import org.apache.submarine.commons.runtime.param.Parameter;
-import org.apache.submarine.commons.runtime.api.TensorFlowRole;
-import org.apache.submarine.commons.runtime.fs.RemoteDirectoryManager;
-import org.apache.submarine.commons.runtime.resource.ResourceUtils;
-import org.yaml.snakeyaml.introspector.Property;
-import org.yaml.snakeyaml.introspector.PropertyUtils;
-
-import java.beans.IntrospectionException;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.List;
-
-/**
- * Parameters used to run a job
- */
-public abstract class RunJobParameters extends RunParameters {
- private String input;
- private String checkpointPath;
-
- private List<Quicklink> quicklinks = new ArrayList<>();
- private List<Localization> localizations = new ArrayList<>();
-
- private boolean waitJobFinish = false;
- protected boolean distributed = false;
-
- private boolean securityDisabled = false;
- private String keytab;
- private String principal;
- private boolean distributeKeytab = false;
- private List<String> confPairs = new ArrayList<>();
-
- RoleParameters workerParameters =
- RoleParameters.createEmpty(TensorFlowRole.WORKER);
-
- @Override
- public void updateParameters(Parameter parametersHolder, ClientContext clientContext)
- throws ParseException, IOException, YarnException {
-
- String input = parametersHolder.getOptionValue(CliConstants.INPUT_PATH);
- String jobDir = parametersHolder.getOptionValue(
- CliConstants.CHECKPOINT_PATH);
-
- if (parametersHolder.hasOption(CliConstants.INSECURE_CLUSTER)) {
- setSecurityDisabled(true);
- }
-
- String kerberosKeytab = parametersHolder.getOptionValue(
- CliConstants.KEYTAB);
- String kerberosPrincipal = parametersHolder.getOptionValue(
- CliConstants.PRINCIPAL);
- CliUtils.doLoginIfSecure(kerberosKeytab, kerberosPrincipal);
-
- if (parametersHolder.hasOption(CliConstants.WAIT_JOB_FINISH)) {
- this.waitJobFinish = true;
- }
-
- // Quicklinks
- List<String> quicklinkStrs = parametersHolder.getOptionValues(
- CliConstants.QUICKLINK);
- if (quicklinkStrs != null) {
- for (String ql : quicklinkStrs) {
- Quicklink quicklink = new Quicklink();
- quicklink.parse(ql);
- quicklinks.add(quicklink);
- }
- }
-
- // Localizations
- List<String> localizationsStr = parametersHolder.getOptionValues(
- CliConstants.LOCALIZATION);
- if (null != localizationsStr) {
- for (String loc : localizationsStr) {
- Localization localization = new Localization();
- localization.parse(loc);
- localizations.add(localization);
- }
- }
- boolean distributeKerberosKeytab = parametersHolder.hasOption(CliConstants
- .DISTRIBUTE_KEYTAB);
-
- List<String> configPairs = parametersHolder
- .getOptionValues(CliConstants.ARG_CONF);
-
- this.setInputPath(input).setCheckpointPath(jobDir)
- .setKeytab(kerberosKeytab)
- .setPrincipal(kerberosPrincipal)
- .setDistributeKeytab(distributeKerberosKeytab)
- .setConfPairs(configPairs);
-
- super.updateParameters(parametersHolder, clientContext);
- }
-
- abstract void executePostOperations(ClientContext clientContext)
- throws IOException;
-
- void setDefaultDirs(ClientContext clientContext) throws IOException {
- // Create directories if needed
- String jobDir = getCheckpointPath();
- if (jobDir == null) {
- jobDir = getJobDir(clientContext);
- setCheckpointPath(jobDir);
- }
-
- if (getNumWorkers() > 0) {
- String savedModelDir = getSavedModelPath();
- if (savedModelDir == null) {
- savedModelDir = jobDir;
- setSavedModelPath(savedModelDir);
- }
- }
- }
-
- private String getJobDir(ClientContext clientContext) throws IOException {
- RemoteDirectoryManager rdm = clientContext.getRemoteDirectoryManager();
- if (getNumWorkers() > 0) {
- return rdm.getJobCheckpointDir(getName(), true).toString();
- } else {
- // when #workers == 0, it means we only launch TB. In that case,
- // point job dir to root dir so all job's metrics will be shown.
- return rdm.getUserRootFolder().toString();
- }
- }
-
- public abstract List<String> getLaunchCommands();
-
- public String getInputPath() {
- return input;
- }
-
- public RunJobParameters setInputPath(String input) {
- this.input = input;
- return this;
- }
-
- public String getCheckpointPath() {
- return checkpointPath;
- }
-
- public RunJobParameters setCheckpointPath(String checkpointPath) {
- this.checkpointPath = checkpointPath;
- return this;
- }
-
- public boolean isWaitJobFinish() {
- return waitJobFinish;
- }
-
- public RunJobParameters setWaitJobFinish(boolean waitJobFinish) {
- this.waitJobFinish = waitJobFinish;
- return this;
- }
-
- public List<Quicklink> getQuicklinks() {
- return quicklinks;
- }
-
- public RunJobParameters setQuicklinks(List<Quicklink> quicklinks) {
- this.quicklinks = quicklinks;
- return this;
- }
-
- public List<Localization> getLocalizations() {
- return localizations;
- }
-
- public RunJobParameters setLocalizations(List<Localization> localizations) {
- this.localizations = localizations;
- return this;
- }
-
- public String getKeytab() {
- return keytab;
- }
-
- public RunJobParameters setKeytab(String kerberosKeytab) {
- this.keytab = kerberosKeytab;
- return this;
- }
-
- public String getPrincipal() {
- return principal;
- }
-
- public RunJobParameters setPrincipal(String kerberosPrincipal) {
- this.principal = kerberosPrincipal;
- return this;
- }
-
- public boolean isSecurityDisabled() {
- return securityDisabled;
- }
-
- public RunJobParameters setSecurityDisabled(boolean securityDisabled) {
- this.securityDisabled = securityDisabled;
- return this;
- }
-
- public boolean isDistributeKeytab() {
- return distributeKeytab;
- }
-
- public RunJobParameters setDistributeKeytab(
- boolean distributeKerberosKeytab) {
- this.distributeKeytab = distributeKerberosKeytab;
- return this;
- }
-
- public List<String> getConfPairs() {
- return confPairs;
- }
-
- public RunJobParameters setConfPairs(List<String> confPairs) {
- this.confPairs = confPairs;
- return this;
- }
-
- public RunJobParameters setDistributed(boolean distributed) {
- this.distributed = distributed;
- return this;
- }
-
- RoleParameters generateWorkerParameters(ClientContext clientContext,
- Parameter parametersHolder, String input)
- throws ParseException, YarnException, IOException {
- int nWorkers = getNumberOfWorkers(parametersHolder, input);
- Resource workerResource =
- determineWorkerResource(parametersHolder, nWorkers, clientContext);
- String workerDockerImage =
- parametersHolder.getOptionValue(CliConstants.WORKER_DOCKER_IMAGE);
- String workerLaunchCmd =
- parametersHolder.getOptionValue(CliConstants.WORKER_LAUNCH_CMD);
- return new RoleParameters(TensorFlowRole.WORKER, nWorkers,
- workerLaunchCmd, workerDockerImage, workerResource);
- }
-
- private Resource determineWorkerResource(Parameter parametersHolder,
- int nWorkers, ClientContext clientContext)
- throws ParseException, YarnException, IOException {
- if (nWorkers > 0) {
- String workerResourceStr =
- parametersHolder.getOptionValue(CliConstants.WORKER_RES);
- if (workerResourceStr == null) {
- throw new ParseException(
- "--" + CliConstants.WORKER_RES + " is absent.");
- }
- return ResourceUtils.createResourceFromString(workerResourceStr);
- }
- return null;
- }
-
- private int getNumberOfWorkers(Parameter parametersHolder,
- String input) throws ParseException, YarnException {
- int nWorkers = 1;
- if (parametersHolder.getOptionValue(CliConstants.N_WORKERS) != null) {
- nWorkers = Integer
- .parseInt(parametersHolder.getOptionValue(CliConstants.N_WORKERS));
- // Only check null value.
- // Training job shouldn't ignore INPUT_PATH option
- // But if nWorkers is 0, INPUT_PATH can be ignored because
- // user can only run Tensorboard
- if (null == input && 0 != nWorkers) {
- throw new ParseException(
- "\"--" + CliConstants.INPUT_PATH + "\" is absent");
- }
- }
- return nWorkers;
- }
-
- public RoleParameters getWorkerParameter() {
- return workerParameters;
- }
-
- public RunJobParameters setWorkerParameter(RoleParameters workerParameters) {
- this.workerParameters = workerParameters;
- return this;
- }
-
- public String getWorkerLaunchCmd() {
- return workerParameters.getLaunchCommand();
- }
-
- public void setWorkerLaunchCmd(String launchCmd) {
- workerParameters.setLaunchCommand(launchCmd);
- }
-
- public int getNumWorkers() {
- return workerParameters.getReplicas();
- }
-
- public void setNumWorkers(int numWorkers) {
- workerParameters.setReplicas(numWorkers);
- }
-
- public Resource getWorkerResource() {
- return workerParameters.getResource();
- }
-
- public void setWorkerResource(Resource resource) {
- workerParameters.setResource(resource);
- }
-
- public String getWorkerDockerImage() {
- return workerParameters.getDockerImage();
- }
-
- public void setWorkerDockerImage(String image) {
- workerParameters.setDockerImage(image);
- }
-
- public boolean isDistributed() {
- return distributed;
- }
-
- @VisibleForTesting
- public static class UnderscoreConverterPropertyUtils extends PropertyUtils {
- @Override
- public Property getProperty(Class<? extends Object> type, String name) {
- if (name.indexOf('_') > -1) {
- name = convertName(name);
- }
- return super.getProperty(type, name);
- }
-
- private static String convertName(String name) {
- return CaseFormat.UPPER_UNDERSCORE.to(CaseFormat.LOWER_CAMEL, name);
- }
- }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/TensorFlowRunJobParameters.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/TensorFlowRunJobParameters.java
deleted file mode 100644
index 925c3f3..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/param/runjob/TensorFlowRunJobParameters.java
+++ /dev/null
@@ -1,261 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.param.runjob;
-
-import com.google.common.collect.Lists;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.CliConstants;
-import org.apache.submarine.client.cli.CliUtils;
-import org.apache.submarine.client.cli.runjob.RoleParameters;
-import org.apache.submarine.commons.runtime.ClientContext;
-import org.apache.submarine.commons.runtime.param.Parameter;
-import org.apache.submarine.commons.runtime.api.TensorFlowRole;
-import org.apache.submarine.commons.runtime.resource.ResourceUtils;
-
-import java.io.IOException;
-import java.util.List;
-
-/**
- * Parameters for TensorFlow job.
- */
-public class TensorFlowRunJobParameters extends RunJobParameters {
- private boolean tensorboardEnabled;
- private static final String CANNOT_BE_DEFINED_FOR_TF =
- "cannot be defined for TensorFlow jobs!";
- private RoleParameters psParameters =
- RoleParameters.createEmpty(TensorFlowRole.PS);
- private RoleParameters tensorBoardParameters =
- RoleParameters.createEmpty(TensorFlowRole.TENSORBOARD);
-
- @Override
- public void updateParameters(Parameter parametersHolder, ClientContext clientContext)
- throws ParseException, IOException, YarnException {
- checkArguments(parametersHolder);
- super.updateParameters(parametersHolder, clientContext);
-
- String input = parametersHolder.getOptionValue(CliConstants.INPUT_PATH);
- this.workerParameters =
- generateWorkerParameters(clientContext, parametersHolder, input);
- this.psParameters = getPSParameters(clientContext, parametersHolder);
- this.distributed = determineIfDistributed(workerParameters.getReplicas(),
- psParameters.getReplicas());
-
- if (parametersHolder.hasOption(CliConstants.TENSORBOARD)) {
- this.tensorboardEnabled = true;
- this.tensorBoardParameters =
- getTensorBoardParameters(parametersHolder, clientContext);
- }
- executePostOperations(clientContext);
- }
-
- @Override
- void executePostOperations(ClientContext clientContext) throws IOException {
- // Set default job dir / saved model dir, etc.
- setDefaultDirs(clientContext);
- replacePatternsInParameters(clientContext);
- }
-
- private void checkArguments(Parameter parametersHolder)
- throws YarnException, ParseException {
- if (parametersHolder.getOptionValue(CliConstants.N_SCHEDULERS) != null) {
- throw new ParseException(getParamCannotBeDefinedErrorMessage(
- CliConstants.N_SCHEDULERS));
- } else if (parametersHolder.getOptionValue(CliConstants.SCHEDULER_RES) != null) {
- throw new ParseException(getParamCannotBeDefinedErrorMessage(
- CliConstants.SCHEDULER_RES));
- } else if (parametersHolder
- .getOptionValue(CliConstants.SCHEDULER_DOCKER_IMAGE) != null) {
- throw new ParseException(getParamCannotBeDefinedErrorMessage(
- CliConstants.SCHEDULER_DOCKER_IMAGE));
- } else if (parametersHolder
- .getOptionValue(CliConstants.SCHEDULER_LAUNCH_CMD) != null) {
- throw new ParseException(getParamCannotBeDefinedErrorMessage(
- CliConstants.SCHEDULER_LAUNCH_CMD));
- }
- }
-
- private String getParamCannotBeDefinedErrorMessage(String cliName) {
- return String.format(
- "Parameter '%s' " + CANNOT_BE_DEFINED_FOR_TF, cliName);
- }
-
- private void replacePatternsInParameters(ClientContext clientContext)
- throws IOException {
- if (StringUtils.isNotEmpty(getPSLaunchCmd())) {
- String afterReplace = CliUtils.replacePatternsInLaunchCommand(
- getPSLaunchCmd(), this, clientContext.getRemoteDirectoryManager());
- setPSLaunchCmd(afterReplace);
- }
-
- if (StringUtils.isNotEmpty(getWorkerLaunchCmd())) {
- String afterReplace =
- CliUtils.replacePatternsInLaunchCommand(getWorkerLaunchCmd(), this,
- clientContext.getRemoteDirectoryManager());
- setWorkerLaunchCmd(afterReplace);
- }
- }
-
- @Override
- public List<String> getLaunchCommands() {
- return Lists.newArrayList(getWorkerLaunchCmd(), getPSLaunchCmd());
- }
-
- private boolean determineIfDistributed(int nWorkers, int nPS)
- throws ParseException {
- // Check #workers and #ps.
- // When distributed training is required
- if (nWorkers >= 2 && nPS > 0) {
- return true;
- } else if (nWorkers <= 1 && nPS > 0) {
- throw new ParseException("Only specified one worker but non-zero PS, "
- + "please double check.");
- }
- return false;
- }
-
- private RoleParameters getPSParameters(ClientContext clientContext,
- Parameter parametersHolder)
- throws YarnException, IOException, ParseException {
- int nPS = getNumberOfPS(parametersHolder);
- Resource psResource =
- determinePSResource(parametersHolder, nPS, clientContext);
- String psDockerImage =
- parametersHolder.getOptionValue(CliConstants.PS_DOCKER_IMAGE);
- String psLaunchCommand =
- parametersHolder.getOptionValue(CliConstants.PS_LAUNCH_CMD);
- return new RoleParameters(TensorFlowRole.PS, nPS, psLaunchCommand,
- psDockerImage, psResource);
- }
-
- private Resource determinePSResource(Parameter parametersHolder,
- int nPS, ClientContext clientContext)
- throws ParseException, YarnException, IOException {
- if (nPS > 0) {
- String psResourceStr =
- parametersHolder.getOptionValue(CliConstants.PS_RES);
- if (psResourceStr == null) {
- throw new ParseException("--" + CliConstants.PS_RES + " is absent.");
- }
- return ResourceUtils.createResourceFromString(psResourceStr);
- }
- return null;
- }
-
- private int getNumberOfPS(Parameter parametersHolder)
- throws YarnException {
- int nPS = 0;
- if (parametersHolder.getOptionValue(CliConstants.N_PS) != null) {
- nPS =
- Integer.parseInt(parametersHolder.getOptionValue(CliConstants.N_PS));
- }
- return nPS;
- }
-
- private RoleParameters getTensorBoardParameters(Parameter parametersHolder,
- ClientContext clientContext) throws YarnException, IOException {
- String tensorboardResourceStr =
- parametersHolder.getOptionValue(CliConstants.TENSORBOARD_RESOURCES);
- if (tensorboardResourceStr == null || tensorboardResourceStr.isEmpty()) {
- tensorboardResourceStr = CliConstants.TENSORBOARD_DEFAULT_RESOURCES;
- }
- Resource tensorboardResource = ResourceUtils.createResourceFromString(
- tensorboardResourceStr);
- String tensorboardDockerImage =
- parametersHolder.getOptionValue(CliConstants.TENSORBOARD_DOCKER_IMAGE);
- return new RoleParameters(TensorFlowRole.TENSORBOARD, 1, null,
- tensorboardDockerImage, tensorboardResource);
- }
-
- public RoleParameters getPsParameters() {
- return psParameters;
- }
-
- public void setPsParameters(RoleParameters parameters) {
- this.psParameters = parameters;
- }
-
- public int getNumPS() {
- return psParameters.getReplicas();
- }
-
- public void setNumPS(int numPS) {
- psParameters.setReplicas(numPS);
- }
-
- public Resource getPsResource() {
- return psParameters.getResource();
- }
-
- public void setPsResource(Resource resource) {
- psParameters.setResource(resource);
- }
-
- public String getPsDockerImage() {
- return psParameters.getDockerImage();
- }
-
- public void setPsDockerImage(String image) {
- psParameters.setDockerImage(image);
- }
-
- public String getPSLaunchCmd() {
- return psParameters.getLaunchCommand();
- }
-
- public void setPSLaunchCmd(String launchCmd) {
- psParameters.setLaunchCommand(launchCmd);
- }
-
- public RoleParameters getTensorBoardParameters() {
- return tensorBoardParameters;
- }
-
- public void setTensorBoardParameters(RoleParameters tensorBoardParameters) {
- this.tensorBoardParameters = tensorBoardParameters;
- }
-
- public boolean isTensorboardEnabled() {
- return tensorboardEnabled;
- }
-
- public void setTensorboardEnabled(boolean tensorboardEnabled) {
- this.tensorboardEnabled = tensorboardEnabled;
- }
-
- public Resource getTensorboardResource() {
- return tensorBoardParameters.getResource();
- }
-
- public void setTensorboardResource(Resource resource) {
- tensorBoardParameters.setResource(resource);
- }
-
- public String getTensorboardDockerImage() {
- return tensorBoardParameters.getDockerImage();
- }
-
- public void setTensorboardDockerImage(String image) {
- tensorBoardParameters.setDockerImage(image);
- }
-
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RoleParameters.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RoleParameters.java
deleted file mode 100644
index fe15d29..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RoleParameters.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.submarine.commons.runtime.api.Role;
-
-/**
- * This class encapsulates data related to a particular Role.
- * Some examples: TF Worker process, TF PS process or a PyTorch worker process.
- */
-public class RoleParameters {
- private final Role role;
- private int replicas;
- private String launchCommand;
- private String dockerImage;
- private Resource resource;
-
- public RoleParameters(Role role, int replicas,
- String launchCommand, String dockerImage, Resource resource) {
- this.role = role;
- this.replicas = replicas;
- this.launchCommand = launchCommand;
- this.dockerImage = dockerImage;
- this.resource = resource;
- }
-
- public static RoleParameters createEmpty(Role role) {
- return new RoleParameters(role, 0, null, null, null);
- }
-
- public Role getRole() {
- return role;
- }
-
- public int getReplicas() {
- return replicas;
- }
-
- public String getLaunchCommand() {
- return launchCommand;
- }
-
- public RoleParameters setLaunchCommand(String launchCommand) {
- this.launchCommand = launchCommand;
- return this;
- }
-
- public String getDockerImage() {
- return dockerImage;
- }
-
- public RoleParameters setDockerImage(String dockerImage) {
- this.dockerImage = dockerImage;
- return this;
- }
-
- public Resource getResource() {
- return resource;
- }
-
- public RoleParameters setResource(Resource resource) {
- this.resource = resource;
- return this;
- }
-
- public RoleParameters setReplicas(int replicas) {
- this.replicas = replicas;
- return this;
- }
-}
diff --git a/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RunJobCli.java b/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RunJobCli.java
deleted file mode 100644
index 5c1bec0..0000000
--- a/submarine-client/src/main/java/org/apache/submarine/client/cli/runjob/RunJobCli.java
+++ /dev/null
@@ -1,371 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.cli.CommandLine;
-import org.apache.commons.cli.GnuParser;
-import org.apache.commons.cli.HelpFormatter;
-import org.apache.commons.cli.Options;
-import org.apache.commons.cli.ParseException;
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.AbstractCli;
-import org.apache.submarine.client.cli.CliConstants;
-import org.apache.submarine.client.cli.CliUtils;
-import org.apache.submarine.client.cli.Command;
-import org.apache.submarine.client.cli.param.ParametersHolder;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.param.yaml.YamlConfigFile;
-import org.apache.submarine.client.cli.param.yaml.YamlParseException;
-import org.apache.submarine.commons.runtime.ClientContext;
-import org.apache.submarine.commons.runtime.Framework;
-import org.apache.submarine.commons.runtime.exception.SubmarineException;
-import org.apache.submarine.commons.runtime.JobMonitor;
-import org.apache.submarine.commons.runtime.JobSubmitter;
-import org.apache.submarine.commons.runtime.fs.StorageKeyConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.yaml.snakeyaml.Yaml;
-import org.yaml.snakeyaml.constructor.Constructor;
-
-import java.io.File;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * This purpose of this class is to handle / parse CLI arguments related to
- * the run job Submarine command.
- */
-public class RunJobCli extends AbstractCli {
- private static final Logger LOG =
- LoggerFactory.getLogger(RunJobCli.class);
- private static final String CAN_BE_USED_WITH_TF_PYTORCH_MXNET =
- "Can be used with TensorFlow or PyTorch or MXNet frameworks.";
- private static final String CAN_BE_USED_WITH_TF_MXNET =
- "Can be used with TensorFlow or MXNet frameworks.";
- private static final String CAN_BE_USED_WITH_TF_ONLY =
- "Can only be used with TensorFlow framework.";
- private static final String CAN_BE_USED_WITH_MXNET_ONLY =
- "Can only be used with MXNet framework.";
- public static final String YAML_PARSE_FAILED = "Failed to parse " +
- "YAML config";
-
-
- private Options options;
- private JobSubmitter jobSubmitter;
- private JobMonitor jobMonitor;
- private ParametersHolder parametersHolder;
-
- public RunJobCli(ClientContext cliContext) {
- this(cliContext, cliContext.getRuntimeFactory().getJobSubmitterInstance(),
- cliContext.getRuntimeFactory().getJobMonitorInstance());
- }
-
- @VisibleForTesting
- public RunJobCli(ClientContext cliContext, JobSubmitter jobSubmitter,
- JobMonitor jobMonitor) {
- super(cliContext);
- this.options = generateOptions();
- this.jobSubmitter = jobSubmitter;
- this.jobMonitor = jobMonitor;
- }
-
- public void printUsages() {
- new HelpFormatter().printHelp("job run", options);
- }
-
- private Options generateOptions() {
- Options options = new Options();
- options.addOption(CliConstants.YAML_CONFIG, true,
- "Config file (in YAML format)");
- options.addOption(CliConstants.FRAMEWORK, true,
- String.format("Framework to use. Valid values are: %s! " +
- "The default framework is Tensorflow.",
- Framework.getValues()));
- options.addOption(CliConstants.NAME, true, "Name of the job");
- options.addOption(CliConstants.INPUT_PATH, true,
- "Input of the job, could be local or other FS directory");
- options.addOption(CliConstants.CHECKPOINT_PATH, true,
- "Training output directory of the job, "
- + "could be local or other FS directory. This typically includes "
- + "checkpoint files and exported model ");
- options.addOption(CliConstants.SAVED_MODEL_PATH, true,
- "Model exported path (savedmodel) of the job, which is needed when "
- + "exported model is not placed under ${checkpoint_path}"
- + "could be local or other FS directory. " +
- "This will be used to serve.");
- options.addOption(CliConstants.DOCKER_IMAGE, true, "Docker image name/tag");
- options.addOption(CliConstants.QUEUE, true,
- "Name of queue to run the job, by default it uses default queue");
-
- addWorkerOptions(options);
- addPSOptions(options);
- addSchedulerOptions(options);
- addTensorboardOptions(options);
-
- options.addOption(CliConstants.ENV, true,
- "Common environment variable of worker/ps");
- options.addOption(CliConstants.VERBOSE, false,
- "Print verbose log for troubleshooting");
- options.addOption(CliConstants.WAIT_JOB_FINISH, false,
- "Specified when user want to wait the job finish");
- options.addOption(CliConstants.QUICKLINK, true, "Specify quicklink so YARN"
- + "web UI shows link to given role instance and port. When "
- + "--tensorboard is specified, quicklink to tensorboard instance will "
- + "be added automatically. The format of quick link is: "
- + "Quick_link_label=http(or https)://role-name:port. For example, "
- + "if want to link to first worker's 7070 port, and text of quicklink "
- + "is Notebook_UI, user need to specify --quicklink "
- + "Notebook_UI=https://master-0:7070");
- options.addOption(CliConstants.LOCALIZATION, true, "Specify"
- + " localization to make remote/local file/directory available to"
- + " all container(Docker)."
- + " Argument format is \"RemoteUri:LocalFilePath[:rw] \" (ro"
- + " permission is not supported yet)"
- + " The RemoteUri can be a file or directory in local or"
- + " HDFS or s3 or abfs or http .etc."
- + " The LocalFilePath can be absolute or relative."
- + " If it's a relative path, it'll be"
- + " under container's implied working directory"
- + " but sub directory is not supported yet."
- + " This option can be set multiple times."
- + " Examples are \n"
- + "-localization \"hdfs:///user/yarn/mydir2:/opt/data\"\n"
- + "-localization \"s3a:///a/b/myfile1:./\"\n"
- + "-localization \"https:///a/b/myfile2:./myfile\"\n"
- + "-localization \"/user/yarn/mydir3:/opt/mydir3\"\n"
- + "-localization \"./mydir1:.\"\n");
- options.addOption(CliConstants.KEYTAB, true, "Specify keytab used by the " +
- "job under security environment");
- options.addOption(CliConstants.PRINCIPAL, true, "Specify principal used " +
- "by the job under security environment");
- options.addOption(CliConstants.DISTRIBUTE_KEYTAB, false, "Distribute " +
- "local keytab to cluster machines for service authentication. If not " +
- "specified, pre-distributed keytab of which path specified by" +
- " parameter" + CliConstants.KEYTAB + " on cluster machines will be " +
- "used");
- options.addOption("h", "help", false, "Print help");
- options.addOption("insecure", false, "Cluster is not Kerberos enabled.");
- options.addOption("conf", true,
- "User specified configuration, as key=val pairs.");
- return options;
- }
-
- private void addWorkerOptions(Options options) {
- options.addOption(CliConstants.N_WORKERS, true,
- "Number of worker tasks of the job, by default it's 1." +
- CAN_BE_USED_WITH_TF_PYTORCH_MXNET);
- options.addOption(CliConstants.WORKER_DOCKER_IMAGE, true,
- "Specify docker image for WORKER, when this is not specified, WORKER "
- + "uses --" + CliConstants.DOCKER_IMAGE + " as default." +
- CAN_BE_USED_WITH_TF_PYTORCH_MXNET);
- options.addOption(CliConstants.WORKER_LAUNCH_CMD, true,
- "Commandline of worker, arguments will be "
- + "directly used to launch the worker" +
- CAN_BE_USED_WITH_TF_PYTORCH_MXNET);
- options.addOption(CliConstants.WORKER_RES, true,
- "Resource of each worker, for example "
- + "memory-mb=2048,vcores=2,yarn.io/gpu=2" +
- CAN_BE_USED_WITH_TF_PYTORCH_MXNET);
- }
-
- private void addPSOptions(Options options) {
- options.addOption(CliConstants.N_PS, true,
- "Number of PS tasks of the job, by default it's 0. " +
- CAN_BE_USED_WITH_TF_MXNET);
- options.addOption(CliConstants.PS_DOCKER_IMAGE, true,
- "Specify docker image for PS, when this is not specified, PS uses --"
- + CliConstants.DOCKER_IMAGE + " as default." +
- CAN_BE_USED_WITH_TF_MXNET);
- options.addOption(CliConstants.PS_LAUNCH_CMD, true,
- "Commandline of PS, arguments will be "
- + "directly used to launch the PS" +
- CAN_BE_USED_WITH_TF_MXNET);
- options.addOption(CliConstants.PS_RES, true,
- "Resource of each PS, for example "
- + "memory-mb=2048,vcores=2,yarn.io/gpu=2" +
- CAN_BE_USED_WITH_TF_MXNET);
- }
-
- private void addTensorboardOptions(Options options) {
- options.addOption(CliConstants.TENSORBOARD, false,
- "Should we run TensorBoard"
- + " for this job? By default it's disabled." +
- CAN_BE_USED_WITH_TF_ONLY);
- options.addOption(CliConstants.TENSORBOARD_RESOURCES, true,
- "Specify resources of Tensorboard, by default it is "
- + CliConstants.TENSORBOARD_DEFAULT_RESOURCES + "." +
- CAN_BE_USED_WITH_TF_ONLY);
- options.addOption(CliConstants.TENSORBOARD_DOCKER_IMAGE, true,
- "Specify Tensorboard docker image. when this is not "
- + "specified, Tensorboard " + "uses --" + CliConstants.DOCKER_IMAGE
- + " as default." +
- CAN_BE_USED_WITH_TF_ONLY);
- }
-
- private void addSchedulerOptions(Options options) {
- options.addOption(CliConstants.N_SCHEDULERS, true,
- "Number of scheduler tasks of the job. " +
- "It should be 1 or 0, by default it's 0."+
- CAN_BE_USED_WITH_MXNET_ONLY);
- options.addOption(CliConstants.SCHEDULER_DOCKER_IMAGE, true,
- "Specify docker image for scheduler, when this is not specified, " +
- "scheduler uses --" + CliConstants.DOCKER_IMAGE +
- " as default. " + CAN_BE_USED_WITH_MXNET_ONLY);
- options.addOption(CliConstants.SCHEDULER_LAUNCH_CMD, true,
- "Commandline of scheduler, arguments will be " +
- "directly used to launch the scheduler. " + CAN_BE_USED_WITH_MXNET_ONLY);
- options.addOption(CliConstants.SCHEDULER_RES, true,
- "Resource of each scheduler, for example " +
- "memory-mb=2048,vcores=2. " + CAN_BE_USED_WITH_MXNET_ONLY);
- }
-
- private void parseCommandLineAndGetRunJobParameters(String[] args)
- throws ParseException, IOException, YarnException {
- try {
- GnuParser parser = new GnuParser();
- CommandLine cli = parser.parse(options, args);
- parametersHolder = createParametersHolder(cli);
- parametersHolder.updateParameters(clientContext);
- } catch (ParseException e) {
- LOG.error("Exception in parse: {}", e.getMessage());
- printUsages();
- throw e;
- }
- }
-
- private ParametersHolder createParametersHolder(CommandLine cli)
- throws ParseException, YarnException {
- String yamlConfigFile =
- cli.getOptionValue(CliConstants.YAML_CONFIG);
- if (yamlConfigFile != null) {
- YamlConfigFile yamlConfig = readYamlConfigFile(yamlConfigFile);
- checkYamlConfig(yamlConfigFile, yamlConfig);
- LOG.info("Using YAML configuration!");
- return ParametersHolder.createWithCmdLineAndYaml(cli, yamlConfig,
- Command.RUN_JOB);
- } else {
- LOG.info("Using CLI configuration!");
- return ParametersHolder.createWithCmdLine(cli, Command.RUN_JOB);
- }
- }
-
- private void checkYamlConfig(String yamlConfigFile,
- YamlConfigFile yamlConfig) {
- if (yamlConfig == null) {
- throw new YamlParseException(String.format(
- YAML_PARSE_FAILED + ", file is empty: %s", yamlConfigFile));
- } else if (yamlConfig.getConfigs() == null) {
- throw new YamlParseException(String.format(YAML_PARSE_FAILED +
- ", config section should be defined, but it cannot be found in " +
- "YAML file '%s'!", yamlConfigFile));
- }
- }
-
- private YamlConfigFile readYamlConfigFile(String filename) {
- Constructor constructor = new Constructor(YamlConfigFile.class);
- constructor.setPropertyUtils(new RunJobParameters.UnderscoreConverterPropertyUtils());
- try {
- LOG.info("Reading YAML configuration from file: {}", filename);
- Yaml yaml = new Yaml(constructor);
- return yaml.loadAs(FileUtils.openInputStream(new File(filename)),
- YamlConfigFile.class);
- } catch (FileNotFoundException e) {
- logExceptionOfYamlParse(filename, e);
- throw new YamlParseException(YAML_PARSE_FAILED +
- ", file does not exist!");
- } catch (Exception e) {
- logExceptionOfYamlParse(filename, e);
- throw new YamlParseException(
- String.format(YAML_PARSE_FAILED + ", details: %s", e.getMessage()));
- }
- }
-
- private void logExceptionOfYamlParse(String filename, Exception e) {
- LOG.error(String.format("Exception while parsing YAML file %s", filename),
- e);
- }
-
- private void storeJobInformation(RunJobParameters parameters,
- ApplicationId applicationId, String[] args) throws IOException {
- String jobName = parameters.getName();
- Map<String, String> jobInfo = new HashMap<>();
- jobInfo.put(StorageKeyConstants.JOB_NAME, jobName);
- jobInfo.put(StorageKeyConstants.APPLICATION_ID, applicationId.toString());
-
- if (parameters.getCheckpointPath() != null) {
- jobInfo.put(StorageKeyConstants.CHECKPOINT_PATH,
- parameters.getCheckpointPath());
- }
- if (parameters.getInputPath() != null) {
- jobInfo.put(StorageKeyConstants.INPUT_PATH,
- parameters.getInputPath());
- }
- if (parameters.getSavedModelPath() != null) {
- jobInfo.put(StorageKeyConstants.SAVED_MODEL_PATH,
- parameters.getSavedModelPath());
- }
-
- String joinedArgs = String.join(" ", args);
- jobInfo.put(StorageKeyConstants.JOB_RUN_ARGS, joinedArgs);
- clientContext.getRuntimeFactory().getSubmarineStorage().addNewJob(jobName,
- jobInfo);
- }
-
- @Override
- public int run(String[] args)
- throws ParseException, IOException, YarnException, SubmarineException {
- if (CliUtils.argsForHelp(args)) {
- printUsages();
- return 0;
- }
-
- parseCommandLineAndGetRunJobParameters(args);
- ApplicationId applicationId = jobSubmitter.submitJob(parametersHolder);
- LOG.info("Submarine job is submitted, the job id is " + applicationId);
- RunJobParameters parameters =
- (RunJobParameters) parametersHolder.getParameters();
- storeJobInformation(parameters, applicationId, args);
- if (parameters.isWaitJobFinish()) {
- this.jobMonitor.waitTrainingFinal(parameters.getName());
- }
-
- return 0;
- }
-
- @VisibleForTesting
- public JobSubmitter getJobSubmitter() {
- return jobSubmitter;
- }
-
- @VisibleForTesting
- public RunJobParameters getRunJobParameters() {
- return (RunJobParameters) parametersHolder.getParameters();
- }
-
- @VisibleForTesting
- public ParametersHolder getParametersHolder() {
- return parametersHolder;
- }
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/ShowJobCliParsingTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/ShowJobCliParsingTest.java
deleted file mode 100644
index 9bbbbe3..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/ShowJobCliParsingTest.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.param.ShowJobParameters;
-import org.apache.submarine.commons.runtime.MockClientContext;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.submarine.commons.runtime.exception.SubmarineException;
-import org.apache.submarine.commons.runtime.RuntimeFactory;
-import org.apache.submarine.commons.runtime.fs.MemorySubmarineStorage;
-import org.apache.submarine.commons.runtime.fs.StorageKeyConstants;
-import org.apache.submarine.commons.runtime.fs.SubmarineStorage;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class ShowJobCliParsingTest {
- @Before
- public void before() {
- SubmarineLogs.verboseOff();
- }
-
- @Test
- public void testPrintHelp() throws IOException {
- MockClientContext mockClientContext = new MockClientContext("testJob");
- ShowJobCli showJobCli = new ShowJobCli(mockClientContext);
- showJobCli.printUsages();
- }
-
- @Test
- public void testShowJob()
- throws InterruptedException, SubmarineException, YarnException,
- ParseException, IOException {
- MockClientContext mockClientContext = new MockClientContext("testJob");
- ShowJobCli showJobCli = new ShowJobCli(mockClientContext) {
- @Override
- protected void getAndPrintJobInfo() {
- // do nothing
- }
- };
- showJobCli.run(new String[] { "--name", "my-job" });
- ShowJobParameters parameters = showJobCli.getParameters();
- Assert.assertEquals(parameters.getName(), "my-job");
- }
-
- private Map<String, String> getMockJobInfo(String jobName) {
- Map<String, String> map = new HashMap<>();
- map.put(StorageKeyConstants.APPLICATION_ID,
- ApplicationId.newInstance(1234L, 1).toString());
- map.put(StorageKeyConstants.JOB_RUN_ARGS, "job run 123456");
- map.put(StorageKeyConstants.INPUT_PATH, "hdfs://" + jobName);
- return map;
- }
-
- @Test
- public void testSimpleShowJob()
- throws InterruptedException, SubmarineException, YarnException,
- ParseException, IOException {
- SubmarineStorage storage = new MemorySubmarineStorage();
- MockClientContext mockClientContext = new MockClientContext("testJob");
- RuntimeFactory runtimeFactory = mock(RuntimeFactory.class);
- when(runtimeFactory.getSubmarineStorage()).thenReturn(storage);
- mockClientContext.setRuntimeFactory(runtimeFactory);
-
- ShowJobCli showJobCli = new ShowJobCli(mockClientContext);
-
- try {
- showJobCli.run(new String[] { "--name", "my-job" });
- } catch (IOException e) {
- // expected
- }
-
-
- storage.addNewJob("my-job", getMockJobInfo("my-job"));
- showJobCli.run(new String[] { "--name", "my-job" });
- }
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/YamlConfigTestUtils.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/YamlConfigTestUtils.java
deleted file mode 100644
index c639ba6..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/YamlConfigTestUtils.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.submarine.client.cli.param.yaml.YamlConfigFile;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.yaml.snakeyaml.Yaml;
-import org.yaml.snakeyaml.constructor.Constructor;
-
-import java.io.File;
-import java.io.IOException;
-import java.io.InputStream;
-
-/**
- * Test utility class for test code that deals with YAML configuration parsing.
- */
-public final class YamlConfigTestUtils {
-
- private YamlConfigTestUtils() {}
-
- public static void deleteFile(File file) {
- if (file != null) {
- file.delete();
- }
- }
-
- public static YamlConfigFile readYamlConfigFile(String filename) {
- Constructor constructor = new Constructor(YamlConfigFile.class);
- constructor.setPropertyUtils(new RunJobParameters.UnderscoreConverterPropertyUtils());
- Yaml yaml = new Yaml(constructor);
- InputStream inputStream = YamlConfigTestUtils.class
- .getClassLoader()
- .getResourceAsStream(filename);
- return yaml.loadAs(inputStream, YamlConfigFile.class);
- }
-
- public static File createTempFileWithContents(String filename)
- throws IOException {
- InputStream inputStream = YamlConfigTestUtils.class
- .getClassLoader()
- .getResourceAsStream(filename);
- File targetFile = File.createTempFile("test", ".yaml");
- FileUtils.copyInputStreamToFile(inputStream, targetFile);
- return targetFile;
- }
-
- public static File createEmptyTempFile() throws IOException {
- return File.createTempFile("test", ".yaml");
- }
-
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/RunJobCliParsingCommonTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/RunJobCliParsingCommonTest.java
deleted file mode 100644
index f75e2b8..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/RunJobCliParsingCommonTest.java
+++ /dev/null
@@ -1,132 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob;
-
-import org.apache.commons.cli.MissingArgumentException;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.param.ParametersHolder;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
-import org.apache.submarine.commons.runtime.MockClientContext;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.submarine.commons.runtime.RuntimeFactory;
-import org.apache.submarine.commons.runtime.JobMonitor;
-import org.apache.submarine.commons.runtime.JobSubmitter;
-import org.apache.submarine.commons.runtime.exception.SubmarineException;
-import org.apache.submarine.commons.runtime.fs.SubmarineStorage;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import java.io.IOException;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Matchers.any;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-/**
- * This class contains some test methods to test common functionality
- * (including TF / PyTorch) of the run job Submarine command.
- */
-public class RunJobCliParsingCommonTest {
-
- @Before
- public void before() {
- SubmarineLogs.verboseOff();
- }
-
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
- public static MockClientContext getMockClientContext()
- throws IOException, YarnException, SubmarineException {
- MockClientContext mockClientContext = new MockClientContext("testJob");
- JobSubmitter mockJobSubmitter = mock(JobSubmitter.class);
- when(mockJobSubmitter.submitJob(any(ParametersHolder.class)))
- .thenReturn(ApplicationId.newInstance(1235L, 1));
-
- JobMonitor mockJobMonitor = mock(JobMonitor.class);
- SubmarineStorage storage = mock(SubmarineStorage.class);
- RuntimeFactory rtFactory = mock(RuntimeFactory.class);
-
- when(rtFactory.getJobSubmitterInstance()).thenReturn(mockJobSubmitter);
- when(rtFactory.getJobMonitorInstance()).thenReturn(mockJobMonitor);
- when(rtFactory.getSubmarineStorage()).thenReturn(storage);
-
- mockClientContext.setRuntimeFactory(rtFactory);
- return mockClientContext;
- }
-
- @Test
- public void testAbsentFrameworkFallsBackToTensorFlow() throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- runJobCli.run(
- new String[]{"--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input", "--checkpoint_path",
- "hdfs://output",
- "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
- "--worker_resources", "memory=4g,vcores=2", "--tensorboard",
- "true", "--verbose", "--wait_job_finish"});
- RunJobParameters runJobParameters = runJobCli.getRunJobParameters();
- assertTrue("Default Framework should be TensorFlow!",
- runJobParameters instanceof TensorFlowRunJobParameters);
- }
-
- @Test
- public void testEmptyFrameworkOption() throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(MissingArgumentException.class);
- expectedException.expectMessage("Missing argument for option: framework");
-
- runJobCli.run(
- new String[]{"--framework", "--name", "my-job",
- "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input", "--checkpoint_path",
- "hdfs://output",
- "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
- "--worker_resources", "memory=4g,vcores=2", "--tensorboard",
- "true", "--verbose", "--wait_job_finish"});
- }
-
- @Test
- public void testInvalidFrameworkOption() throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("Failed to parse Framework type");
-
- runJobCli.run(
- new String[]{"--framework", "bla", "--name", "my-job",
- "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input", "--checkpoint_path",
- "hdfs://output",
- "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
- "--worker_resources", "memory=4g,vcores=2", "--tensorboard",
- "true", "--verbose", "--wait_job_finish"});
- }
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/RunJobCliParsingCommonYamlTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/RunJobCliParsingCommonYamlTest.java
deleted file mode 100644
index c9d6a2d..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/RunJobCliParsingCommonYamlTest.java
+++ /dev/null
@@ -1,258 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob;
-
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.client.cli.YamlConfigTestUtils;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
-import org.apache.submarine.client.cli.param.yaml.YamlParseException;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.submarine.commons.runtime.exception.SubmarineRuntimeException;
-import org.apache.submarine.commons.runtime.resource.ResourceUtils;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.BeforeClass;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-
-import static org.apache.submarine.client.cli.runjob.RunJobCliParsingCommonTest.getMockClientContext;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * This class contains some test methods to test common YAML parsing
- * functionality (including TF / PyTorch) of the run job Submarine command.
- */
-public class RunJobCliParsingCommonYamlTest {
- private static final String DIR_NAME = "runjob-common-yaml";
- private static final String TF_DIR = "runjob-pytorch-yaml";
- private File yamlConfig;
- private static Logger LOG = LoggerFactory.getLogger(
- RunJobCliParsingCommonYamlTest.class);
-
- @Before
- public void before() {
- SubmarineLogs.verboseOff();
- }
-
- @After
- public void after() {
- YamlConfigTestUtils.deleteFile(yamlConfig);
- }
-
- @BeforeClass
- public static void configureResourceTypes() {
- try {
- ResourceUtils.configureResourceType(ResourceUtils.GPU_URI);
- } catch (SubmarineRuntimeException e) {
- LOG.info("The hadoop dependency doesn't support gpu resource, " +
- "so just skip this test case.");
- }
- }
-
- @Rule
- public ExpectedException exception = ExpectedException.none();
-
- @Test
- public void testYamlAndCliOptionIsDefinedIsInvalid() throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
- Assert.assertFalse(SubmarineLogs.isVerbose());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- TF_DIR + "/valid-config.yaml");
- String[] args = new String[] {"--name", "my-job",
- "--docker_image", "tf-docker:1.1.0",
- "-f", yamlConfig.getAbsolutePath() };
-
- exception.expect(YarnException.class);
- exception.expectMessage("defined both with YAML config and with " +
- "CLI argument");
-
- runJobCli.run(args);
- }
-
- @Test
- public void testYamlAndCliOptionIsDefinedIsInvalidWithListOption()
- throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
- Assert.assertFalse(SubmarineLogs.isVerbose());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- TF_DIR + "/valid-config.yaml");
- String[] args = new String[] {"--name", "my-job",
- "--quicklink", "AAA=http://master-0:8321",
- "--quicklink", "BBB=http://worker-0:1234",
- "-f", yamlConfig.getAbsolutePath()};
-
- exception.expect(YarnException.class);
- exception.expectMessage("defined both with YAML config and with " +
- "CLI argument");
-
- runJobCli.run(args);
- }
-
- @Test
- public void testFalseValuesForBooleanFields() throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
- Assert.assertFalse(SubmarineLogs.isVerbose());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/test-false-values.yaml");
- runJobCli.run(
- new String[] {"-f", yamlConfig.getAbsolutePath(), "--verbose"});
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-
- assertTrue(RunJobParameters.class + " must be an instance of " +
- TensorFlowRunJobParameters.class,
- jobRunParameters instanceof TensorFlowRunJobParameters);
- TensorFlowRunJobParameters tensorFlowParams =
- (TensorFlowRunJobParameters) jobRunParameters;
-
- assertFalse(jobRunParameters.isDistributeKeytab());
- assertFalse(jobRunParameters.isWaitJobFinish());
- assertFalse(tensorFlowParams.isTensorboardEnabled());
- }
-
- @Test
- public void testWrongIndentation() throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
- Assert.assertFalse(SubmarineLogs.isVerbose());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/wrong-indentation.yaml");
-
- exception.expect(YamlParseException.class);
- exception.expectMessage("Failed to parse YAML config, details:");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
- }
-
- @Test
- public void testWrongFilename() throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
- Assert.assertFalse(SubmarineLogs.isVerbose());
-
- exception.expect(YamlParseException.class);
- runJobCli.run(
- new String[]{"-f", "not-existing", "--verbose"});
- }
-
- @Test
- public void testEmptyFile() throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-
- yamlConfig = YamlConfigTestUtils.createEmptyTempFile();
-
- exception.expect(YamlParseException.class);
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
- }
-
- @Test
- public void testNotExistingFile() throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-
- exception.expect(YamlParseException.class);
- exception.expectMessage("file does not exist");
- runJobCli.run(
- new String[]{"-f", "blabla", "--verbose"});
- }
-
- @Test
- public void testWrongPropertyName() throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/wrong-property-name.yaml");
-
- exception.expect(YamlParseException.class);
- exception.expectMessage("Failed to parse YAML config, details:");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
- }
-
- @Test
- public void testMissingConfigsSection() throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/missing-configs.yaml");
-
- exception.expect(YamlParseException.class);
- exception.expectMessage("config section should be defined, " +
- "but it cannot be found");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
- }
-
- @Test
- public void testMissingSectionsShouldParsed() throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/some-sections-missing.yaml");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
- }
-
-
- @Test
- public void testAbsentFramework() throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/missing-framework.yaml");
-
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
- }
-
- @Test
- public void testEmptyFramework() throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/empty-framework.yaml");
-
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
- }
-
- @Test
- public void testInvalidFramework() throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/invalid-framework.yaml");
-
- exception.expect(YamlParseException.class);
- exception.expectMessage("framework should is defined, " +
- "but it has an invalid value");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
- }
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/RunJobCliParsingParameterizedTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/RunJobCliParsingParameterizedTest.java
deleted file mode 100644
index 5e5ab09..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/RunJobCliParsingParameterizedTest.java
+++ /dev/null
@@ -1,196 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob;
-import com.google.common.collect.Lists;
-import org.apache.commons.cli.ParseException;
-import org.apache.submarine.client.cli.CliConstants;
-import org.apache.submarine.client.cli.param.runjob.PyTorchRunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
-import org.apache.submarine.commons.runtime.Framework;
-import org.apache.submarine.commons.runtime.MockClientContext;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.submarine.commons.runtime.JobMonitor;
-import org.apache.submarine.commons.runtime.JobSubmitter;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-
-import static org.apache.submarine.client.cli.runjob.RunJobCliParsingCommonTest.getMockClientContext;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.mockito.Mockito.mock;
-
-/**
- * This class contains some test methods to test common CLI parsing
- * functionality (including TF / PyTorch) of the run job Submarine command.
- */
-@RunWith(Parameterized.class)
-public class RunJobCliParsingParameterizedTest {
-
- private final Framework framework;
-
- @Before
- public void before() {
- SubmarineLogs.verboseOff();
- }
-
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
- @Parameterized.Parameters
- public static Collection<Object[]> data() {
- Collection<Object[]> params = new ArrayList<>();
- params.add(new Object[]{Framework.TENSORFLOW });
- params.add(new Object[]{Framework.PYTORCH });
- return params;
- }
-
- public RunJobCliParsingParameterizedTest(Framework framework) {
- this.framework = framework;
- }
-
- private String getFrameworkName() {
- return framework.getValue();
- }
-
- @Test
- public void testPrintHelp() throws IOException {
- MockClientContext mockClientContext = new MockClientContext("testJob");
- JobSubmitter mockJobSubmitter = mock(JobSubmitter.class);
- JobMonitor mockJobMonitor = mock(JobMonitor.class);
- RunJobCli runJobCli = new RunJobCli(mockClientContext, mockJobSubmitter,
- mockJobMonitor);
- runJobCli.printUsages();
- }
-
- @Test
- public void testNoInputPathOptionSpecified() throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
- String expectedErrorMessage = "\"--" + CliConstants.INPUT_PATH + "\"" +
- " is absent";
- String actualMessage = "";
- try {
- runJobCli.run(
- new String[]{"--framework", getFrameworkName(),
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--checkpoint_path", "hdfs://output",
- "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
- "--worker_resources", "memory=4g,vcores=2", "--verbose",
- "--wait_job_finish"});
- } catch (ParseException e) {
- actualMessage = e.getMessage();
- e.printStackTrace();
- }
- assertEquals(expectedErrorMessage, actualMessage);
- }
-
- @Test
- public void testJobWithoutName() throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
- String expectedErrorMessage =
- "--" + CliConstants.NAME + " is absent";
- String actualMessage = "";
- try {
- runJobCli.run(
- new String[]{"--framework", getFrameworkName(),
- "--docker_image", "tf-docker:1.1.0",
- "--num_workers", "0", "--verbose"});
- } catch (ParseException e) {
- actualMessage = e.getMessage();
- e.printStackTrace();
- }
- assertEquals(expectedErrorMessage, actualMessage);
- }
-
- @Test
- public void testLaunchCommandPatternReplace() throws Exception {
- RunJobCli runJobCli = new RunJobCli(getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- List<String> parameters = Lists.newArrayList("--framework",
- getFrameworkName(),
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input", "--checkpoint_path",
- "hdfs://output",
- "--num_workers", "3",
- "--worker_launch_cmd", "python run-job.py --input=%input_path% " +
- "--model_dir=%checkpoint_path% " +
- "--export_dir=%saved_model_path%/savedmodel",
- "--worker_resources", "memory=2048,vcores=2");
-
- if (framework == Framework.TENSORFLOW) {
- parameters.addAll(Lists.newArrayList(
- "--ps_resources", "memory=4096,vcores=4",
- "--ps_launch_cmd", "python run-ps.py --input=%input_path% " +
- "--model_dir=%checkpoint_path%/model",
- "--verbose"));
- }
-
- runJobCli.run(parameters.toArray(new String[0]));
-
- RunJobParameters runJobParameters = checkExpectedFrameworkParams(runJobCli);
-
- if (framework == Framework.TENSORFLOW) {
- TensorFlowRunJobParameters tensorFlowParams =
- (TensorFlowRunJobParameters) runJobParameters;
- assertEquals(
- "python run-job.py --input=hdfs://input --model_dir=hdfs://output "
- + "--export_dir=hdfs://output/savedmodel",
- tensorFlowParams.getWorkerLaunchCmd());
- assertEquals(
- "python run-ps.py --input=hdfs://input " +
- "--model_dir=hdfs://output/model",
- tensorFlowParams.getPSLaunchCmd());
- } else if (framework == Framework.PYTORCH) {
- PyTorchRunJobParameters pyTorchParameters =
- (PyTorchRunJobParameters) runJobParameters;
- assertEquals(
- "python run-job.py --input=hdfs://input --model_dir=hdfs://output "
- + "--export_dir=hdfs://output/savedmodel",
- pyTorchParameters.getWorkerLaunchCmd());
- }
- }
-
- private RunJobParameters checkExpectedFrameworkParams(RunJobCli runJobCli) {
- RunJobParameters runJobParameters = runJobCli.getRunJobParameters();
-
- if (framework == Framework.TENSORFLOW) {
- assertTrue(RunJobParameters.class + " must be an instance of " +
- TensorFlowRunJobParameters.class,
- runJobParameters instanceof TensorFlowRunJobParameters);
- } else if (framework == Framework.PYTORCH) {
- assertTrue(RunJobParameters.class + " must be an instance of " +
- PyTorchRunJobParameters.class,
- runJobParameters instanceof PyTorchRunJobParameters);
- }
- return runJobParameters;
- }
-
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/mxnet/RunJobCliParsingMXNetTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/mxnet/RunJobCliParsingMXNetTest.java
deleted file mode 100644
index c6c0678..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/mxnet/RunJobCliParsingMXNetTest.java
+++ /dev/null
@@ -1,175 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob.mxnet;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.submarine.client.cli.param.runjob.MXNetRunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.runjob.RunJobCli;
-import org.apache.submarine.client.cli.runjob.RunJobCliParsingCommonTest;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import static org.junit.Assert.*;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test class that verifies the correctness of MXNet
- * CLI configuration parsing.
- */
-public class RunJobCliParsingMXNetTest {
- @Before
- public void before() {
- SubmarineLogs.verboseOff();
- }
-
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
- @Test
- public void testBasicRunJobForSingleNodeTraining() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- runJobCli.run(
- new String[]{"--framework", "mxnet", "--name", "my-job",
- "--docker_image", "mxnet-docker:1.1.0",
- "--input_path", "hdfs://input",
- "--num_workers", "1", "--num_ps", "1", "--worker_launch_cmd",
- "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
- "--ps_resources", "memory=4G,vcores=2", "--ps_launch_cmd",
- "python run-ps.py", "--num_schedulers", "1", "--scheduler_launch_cmd",
- "python run-scheduler.py", "--scheduler_resources", "memory=1024M,vcores=2",
- "--verbose", "--wait_job_finish"});
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- assertTrue(RunJobParameters.class +
- " must be an instance of " +
- MXNetRunJobParameters.class,
- jobRunParameters instanceof MXNetRunJobParameters);
- MXNetRunJobParameters mxNetParams =
- (MXNetRunJobParameters) jobRunParameters;
-
- assertEquals(jobRunParameters.getInputPath(), "hdfs://input");
- assertEquals(mxNetParams.getNumWorkers(), 1);
- assertEquals(mxNetParams.getWorkerLaunchCmd(), "python run-job.py");
- assertEquals(Resources.createResource(2048, 2),
- mxNetParams.getWorkerResource());
- assertEquals(mxNetParams.getNumPS(), 1);
- assertEquals(mxNetParams.getNumSchedulers(), 1);
- assertTrue(SubmarineLogs.isVerbose());
- assertTrue(jobRunParameters.isWaitJobFinish());
- }
-
- @Test
- public void testBasicRunJobForDistributedTraining() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
- runJobCli.run(
- new String[]{"--framework", "mxnet", "--name", "my-job",
- "--docker_image", "mxnet-docker:1.1.0",
- "--input_path", "hdfs://input",
- "--num_workers", "2", "--num_ps", "2", "--worker_launch_cmd",
- "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
- "--ps_resources", "memory=4G,vcores=2", "--ps_launch_cmd",
- "python run-ps.py", "--num_schedulers", "1", "--scheduler_launch_cmd",
- "python run-scheduler.py", "--scheduler_resources", "memory=1024M,vcores=2",
- "--verbose"});
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- assertTrue(RunJobParameters.class +
- " must be an instance of " +
- MXNetRunJobParameters.class,
- jobRunParameters instanceof MXNetRunJobParameters);
- MXNetRunJobParameters mxNetParams =
- (MXNetRunJobParameters) jobRunParameters;
-
- assertEquals(jobRunParameters.getInputPath(), "hdfs://input");
- assertEquals(jobRunParameters.getDockerImageName(), "mxnet-docker:1.1.0");
- assertEquals(mxNetParams.getNumWorkers(), 2);
- assertEquals(Resources.createResource(2048, 2),
- mxNetParams.getWorkerResource());
- assertEquals(mxNetParams.getWorkerLaunchCmd(), "python run-job.py");
- assertEquals(mxNetParams.getNumPS(), 2);
- assertEquals(Resources.createResource(4096, 2),
- mxNetParams.getPsResource());
- assertEquals(mxNetParams.getPSLaunchCmd(), "python run-ps.py");
- assertEquals(mxNetParams.getNumSchedulers(), 1);
- assertEquals(Resources.createResource(1024, 2),
- mxNetParams.getSchedulerResource());
- assertEquals(mxNetParams.getSchedulerLaunchCmd(), "python run-scheduler.py");
- assertTrue(SubmarineLogs.isVerbose());
- }
-
- @Test
- public void testTensorboardCannotBeDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("cannot be defined for MXNet jobs");
- runJobCli.run(
- new String[]{"--framework", "mxnet",
- "--name", "my-job", "--docker_image", "mxnet-docker:1.1.0",
- "--input_path", "hdfs://input",
- "--num_workers", "2",
- "--worker_launch_cmd", "python run-job.py",
- "--worker_resources", "memory=2048M,vcores=2",
- "--tensorboard"});
- }
-
- @Test
- public void testTensorboardResourcesCannotBeDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("cannot be defined for MXNet jobs");
- runJobCli.run(
- new String[]{"--framework", "mxnet",
- "--name", "my-job", "--docker_image", "mxnet-docker:1.1.0",
- "--input_path", "hdfs://input",
- "--num_workers", "2",
- "--worker_launch_cmd", "python run-job.py",
- "--worker_resources", "memory=2048M,vcores=2",
- "--tensorboard_resources", "memory=1024M,vcores=2"});
- }
-
- @Test
- public void testTensorboardDockerImageCannotBeDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("cannot be defined for MXNet jobs");
- runJobCli.run(
- new String[]{"--framework", "mxnet",
- "--name", "my-job", "--docker_image", "mxnet-docker:1.1.0",
- "--input_path", "hdfs://input",
- "--num_workers", "2",
- "--worker_launch_cmd", "python run-job.py",
- "--worker_resources", "memory=2048M,vcores=2",
- "--tensorboard_docker_image", "TBDockerImage"});
- }
-}
-
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/mxnet/RunJobCliParsingMXNetYamlTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/mxnet/RunJobCliParsingMXNetYamlTest.java
deleted file mode 100644
index 0aa60f6..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/mxnet/RunJobCliParsingMXNetYamlTest.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob.mxnet;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.submarine.client.cli.YamlConfigTestUtils;
-import org.apache.submarine.client.cli.param.runjob.MXNetRunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.param.yaml.YamlParseException;
-import org.apache.submarine.client.cli.runjob.RunJobCli;
-import org.apache.submarine.client.cli.runjob.RunJobCliParsingCommonTest;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.submarine.commons.runtime.exception.SubmarineRuntimeException;
-import org.apache.submarine.commons.runtime.resource.ResourceUtils;
-import org.junit.*;
-import org.junit.rules.ExpectedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.List;
-
-import static org.junit.Assert.*;
-import static org.junit.Assert.assertNotNull;
-
-/**
- * Test class that verifies the correctness of MXNet
- * YAML configuration parsing.
- */
-public class RunJobCliParsingMXNetYamlTest {
- private static final String OVERRIDDEN_PREFIX = "overridden_";
- private static final String DIR_NAME = "runjob-mxnet-yaml";
- private File yamlConfig;
- private static Logger LOG = LoggerFactory.getLogger(
- RunJobCliParsingMXNetYamlTest.class);
-
- @Before
- public void before() {
- SubmarineLogs.verboseOff();
- }
-
- @After
- public void after() {
- YamlConfigTestUtils.deleteFile(yamlConfig);
- }
-
- @Rule
- public ExpectedException exception = ExpectedException.none();
-
- private void verifyBasicConfigValues(RunJobParameters jobRunParameters) {
- verifyBasicConfigValues(jobRunParameters,
- ImmutableList.of("env1=env1Value", "env2=env2Value"));
- }
-
- private void verifyBasicConfigValues(RunJobParameters jobRunParameters,
- List<String> expectedEnvs) {
- assertEquals("testInputPath", jobRunParameters.getInputPath());
- assertEquals("testCheckpointPath", jobRunParameters.getCheckpointPath());
- assertEquals("testDockerImage", jobRunParameters.getDockerImageName());
-
- assertNotNull(jobRunParameters.getLocalizations());
- assertEquals(2, jobRunParameters.getLocalizations().size());
-
- assertNotNull(jobRunParameters.getQuicklinks());
- assertEquals(2, jobRunParameters.getQuicklinks().size());
-
- assertTrue(SubmarineLogs.isVerbose());
- assertTrue(jobRunParameters.isWaitJobFinish());
-
- for (String env : expectedEnvs) {
- assertTrue(String.format(
- "%s should be in env list of jobRunParameters!", env),
- jobRunParameters.getEnvars().contains(env));
- }
- }
-
- private void verifyPsValues(RunJobParameters jobRunParameters,
- String prefix) {
- assertTrue(RunJobParameters.class + " must be an instance of " +
- MXNetRunJobParameters.class,
- jobRunParameters instanceof MXNetRunJobParameters);
- MXNetRunJobParameters mxNetParams =
- (MXNetRunJobParameters) jobRunParameters;
-
- assertEquals(4, mxNetParams.getNumPS());
- assertEquals(prefix + "testLaunchCmdPs", mxNetParams.getPSLaunchCmd());
- assertEquals(prefix + "testDockerImagePs",
- mxNetParams.getPsDockerImage());
- assertEquals(Resources.createResource(20500, 34),
- mxNetParams.getPsResource());
- }
-
- private void verifySchedulerValues(RunJobParameters jobRunParameters,
- String prefix) {
- assertTrue(RunJobParameters.class + " must be an instance of " +
- MXNetRunJobParameters.class, jobRunParameters instanceof MXNetRunJobParameters);
- MXNetRunJobParameters mxNetParams = (MXNetRunJobParameters) jobRunParameters;
- assertEquals(1, mxNetParams.getNumSchedulers());
- assertEquals(prefix + "testLaunchCmdScheduler",
- mxNetParams.getSchedulerLaunchCmd());
- assertEquals(prefix + "testDockerImageScheduler", mxNetParams.getSchedulerDockerImage());
- assertEquals(Resources.createResource(10240, 16),
- mxNetParams.getSchedulerResource());
- }
-
- private void verifyWorkerValues(RunJobParameters jobRunParameters, String prefix) {
- MXNetRunJobParameters mxNetParams =
- verifyWorkerCommonValues(jobRunParameters, prefix);
- assertEquals(Resources.createResource(20480, 32),
- mxNetParams.getWorkerResource());
- }
-
- private MXNetRunJobParameters verifyWorkerCommonValues(
- RunJobParameters jobRunParameters, String prefix) {
- assertTrue(RunJobParameters.class + " must be an instance of " +
- MXNetRunJobParameters.class,
- jobRunParameters instanceof MXNetRunJobParameters);
- MXNetRunJobParameters mxNetParams =
- (MXNetRunJobParameters) jobRunParameters;
-
- assertEquals(3, mxNetParams.getNumWorkers());
- assertEquals(prefix + "testLaunchCmdWorker",
- mxNetParams.getWorkerLaunchCmd());
- assertEquals(prefix + "testDockerImageWorker",
- mxNetParams.getWorkerDockerImage());
- return mxNetParams;
- }
-
- private void verifyWorkerValuesWithGpu(RunJobParameters jobRunParameters, String prefix) {
- MXNetRunJobParameters mxNetParams =
- verifyWorkerCommonValues(jobRunParameters, prefix);
- Resource workResource = Resources.createResource(20480, 32);
- ResourceUtils.setResource(workResource, ResourceUtils.GPU_URI, 2);
- assertEquals(workResource, mxNetParams.getWorkerResource());
- }
-
- private void verifySecurityValues(RunJobParameters jobRunParameters) {
- assertEquals("keytabPath", jobRunParameters.getKeytab());
- assertEquals("testPrincipal", jobRunParameters.getPrincipal());
- assertTrue(jobRunParameters.isDistributeKeytab());
- }
-
- @Test
- public void testValidYamlParsing() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- Assert.assertFalse(SubmarineLogs.isVerbose());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/valid-config.yaml");
- runJobCli.run(
- new String[] {"-f", yamlConfig.getAbsolutePath(), "--verbose"});
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- verifyBasicConfigValues(jobRunParameters);
- verifyPsValues(jobRunParameters, "");
- verifySchedulerValues(jobRunParameters, "");
- verifyWorkerValues(jobRunParameters, "");
- verifySecurityValues(jobRunParameters);
- }
-
- @Test
- public void testValidGpuYamlParsing() throws Exception {
- try {
- ResourceUtils.configureResourceType(ResourceUtils.GPU_URI);
- } catch (SubmarineRuntimeException e) {
- LOG.info("The hadoop dependency doesn't support gpu resource, " +
- "so just skip this test case.");
- return;
- }
-
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- Assert.assertFalse(SubmarineLogs.isVerbose());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/valid-gpu-config.yaml");
- runJobCli.run(
- new String[] {"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- verifyBasicConfigValues(jobRunParameters);
- verifyPsValues(jobRunParameters, "");
- verifySchedulerValues(jobRunParameters, "");
- verifyWorkerValuesWithGpu(jobRunParameters, "");
- verifySecurityValues(jobRunParameters);
- }
-
- @Test
- public void testRoleOverrides() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- Assert.assertFalse(SubmarineLogs.isVerbose());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/valid-config-with-overrides.yaml");
-
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- verifyBasicConfigValues(jobRunParameters);
- verifyPsValues(jobRunParameters, OVERRIDDEN_PREFIX);
- verifySchedulerValues(jobRunParameters, OVERRIDDEN_PREFIX);
- verifyWorkerValues(jobRunParameters, OVERRIDDEN_PREFIX);
- verifySecurityValues(jobRunParameters);
- }
-
- @Test
- public void testMissingPrincipalUnderSecuritySection() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/security-principal-is-missing.yaml");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- verifyBasicConfigValues(jobRunParameters);
- verifyPsValues(jobRunParameters, "");
- verifySchedulerValues(jobRunParameters, "");
- verifyWorkerValues(jobRunParameters, "");
-
- //Verify security values
- assertEquals("keytabPath", jobRunParameters.getKeytab());
- assertNull("Principal should be null!", jobRunParameters.getPrincipal());
- assertTrue(jobRunParameters.isDistributeKeytab());
- }
-
- @Test
- public void testMissingEnvs() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/envs-are-missing.yaml");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- verifyBasicConfigValues(jobRunParameters, ImmutableList.of());
- verifyPsValues(jobRunParameters, "");
- verifySchedulerValues(jobRunParameters, "");
- verifyWorkerValues(jobRunParameters, "");
- verifySecurityValues(jobRunParameters);
- }
-
- @Test
- public void testInvalidConfigTensorboardSectionIsDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- exception.expect(YamlParseException.class);
- exception.expectMessage("TensorBoard section should not be defined " +
- "when TensorFlow is not the selected framework!");
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/invalid-config-tensorboard-section.yaml");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
- }
-}
\ No newline at end of file
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/pytorch/RunJobCliParsingPyTorchTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/pytorch/RunJobCliParsingPyTorchTest.java
deleted file mode 100644
index b1ec19b..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/pytorch/RunJobCliParsingPyTorchTest.java
+++ /dev/null
@@ -1,282 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob.pytorch;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.submarine.client.cli.param.runjob.PyTorchRunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.runjob.RunJobCli;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.submarine.client.cli.runjob.RunJobCliParsingCommonTest;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test class that verifies the correctness of PyTorch
- * CLI configuration parsing.
- */
-public class RunJobCliParsingPyTorchTest {
-
- @Before
- public void before() {
- SubmarineLogs.verboseOff();
- }
-
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
- @Test
- public void testBasicRunJobForSingleNodeTraining() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- runJobCli.run(
- new String[]{"--framework", "pytorch",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input", "--checkpoint_path",
- "hdfs://output",
- "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
- "--worker_resources", "memory=4g,vcores=2", "--verbose",
- "--wait_job_finish"});
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- assertTrue(RunJobParameters.class +
- " must be an instance of " +
- PyTorchRunJobParameters.class,
- jobRunParameters instanceof PyTorchRunJobParameters);
- PyTorchRunJobParameters pyTorchParams =
- (PyTorchRunJobParameters) jobRunParameters;
-
- assertEquals(jobRunParameters.getInputPath(), "hdfs://input");
- assertEquals(jobRunParameters.getCheckpointPath(), "hdfs://output");
- assertEquals(pyTorchParams.getNumWorkers(), 1);
- assertEquals(pyTorchParams.getWorkerLaunchCmd(),
- "python run-job.py");
- assertEquals(Resources.createResource(4096, 2),
- pyTorchParams.getWorkerResource());
- assertTrue(SubmarineLogs.isVerbose());
- assertTrue(jobRunParameters.isWaitJobFinish());
- }
-
- @Test
- public void testNumPSCannotBeDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("cannot be defined for PyTorch jobs");
- runJobCli.run(
- new String[]{"--framework", "pytorch",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input",
- "--checkpoint_path", "hdfs://output",
- "--num_workers", "3",
- "--worker_launch_cmd",
- "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
- "--num_ps", "2"});
- }
-
- @Test
- public void testPSResourcesCannotBeDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("cannot be defined for PyTorch jobs");
- runJobCli.run(
- new String[]{"--framework", "pytorch",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input",
- "--checkpoint_path", "hdfs://output",
- "--num_workers", "3",
- "--worker_launch_cmd",
- "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
- "--ps_resources", "memory=2048M,vcores=2"});
- }
-
- @Test
- public void testPSDockerImageCannotBeDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("cannot be defined for PyTorch jobs");
- runJobCli.run(
- new String[]{"--framework", "pytorch",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input",
- "--checkpoint_path", "hdfs://output",
- "--num_workers", "3",
- "--worker_launch_cmd",
- "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
- "--ps_docker_image", "psDockerImage"});
- }
-
- @Test
- public void testPSLaunchCommandCannotBeDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("cannot be defined for PyTorch jobs");
- runJobCli.run(
- new String[]{"--framework", "pytorch",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input",
- "--checkpoint_path", "hdfs://output",
- "--num_workers", "3",
- "--worker_launch_cmd",
- "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
- "--ps_launch_cmd", "psLaunchCommand"});
- }
-
- @Test
- public void testTensorboardCannotBeDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("cannot be defined for PyTorch jobs");
- runJobCli.run(
- new String[]{"--framework", "pytorch",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input",
- "--checkpoint_path", "hdfs://output",
- "--num_workers", "3",
- "--worker_launch_cmd",
- "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
- "--tensorboard"});
- }
-
- @Test
- public void testTensorboardResourcesCannotBeDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("cannot be defined for PyTorch jobs");
- runJobCli.run(
- new String[]{"--framework", "pytorch",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input",
- "--checkpoint_path", "hdfs://output",
- "--num_workers", "3",
- "--worker_launch_cmd",
- "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
- "--tensorboard_resources", "memory=2048M,vcores=2"});
- }
-
- @Test
- public void testTensorboardDockerImageCannotBeDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("cannot be defined for PyTorch jobs");
- runJobCli.run(
- new String[]{"--framework", "pytorch",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input",
- "--checkpoint_path", "hdfs://output",
- "--num_workers", "3",
- "--worker_launch_cmd",
- "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
- "--tensorboard_docker_image", "TBDockerImage"});
- }
-
- @Test
- public void testNumSchedulerCannotBeDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("cannot be defined for PyTorch jobs");
- runJobCli.run(
- new String[]{"--framework", "pytorch",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input",
- "--checkpoint_path", "hdfs://output",
- "--num_workers", "3",
- "--worker_launch_cmd",
- "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
- "--num_schedulers", "1"});
- }
-
- @Test
- public void testSchedulerResourcesCannotBeDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("cannot be defined for PyTorch jobs");
- runJobCli.run(
- new String[]{"--framework", "pytorch",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input",
- "--checkpoint_path", "hdfs://output",
- "--num_workers", "3",
- "--worker_launch_cmd", "python run-job.py",
- "--worker_resources", "memory=2048M,vcores=2",
- "--scheduler_resources", "memory=2048M,vcores=2"});
- }
-
- @Test
- public void testSchedulerDockerImageCannotBeDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("cannot be defined for PyTorch jobs");
- runJobCli.run(
- new String[]{"--framework", "pytorch",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input",
- "--checkpoint_path", "hdfs://output",
- "--num_workers", "3",
- "--worker_launch_cmd", "python run-job.py",
- "--worker_resources", "memory=2048M,vcores=2",
- "--scheduler_docker_image", "schedulerDockerImage"});
- }
-
- @Test
- public void testSchedulerLaunchCommandCannotBeDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("cannot be defined for PyTorch jobs");
- runJobCli.run(
- new String[]{"--framework", "pytorch",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input",
- "--checkpoint_path", "hdfs://output",
- "--num_workers", "3",
- "--worker_launch_cmd", "python run-job.py",
- "--worker_resources", "memory=2048M,vcores=2",
- "--scheduler_launch_cmd", "schedulerLaunchCommand"});
- }
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/pytorch/RunJobCliParsingPyTorchYamlTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/pytorch/RunJobCliParsingPyTorchYamlTest.java
deleted file mode 100644
index 2ab075f..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/pytorch/RunJobCliParsingPyTorchYamlTest.java
+++ /dev/null
@@ -1,271 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob.pytorch;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-import java.io.File;
-import java.util.List;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.submarine.client.cli.YamlConfigTestUtils;
-import org.apache.submarine.client.cli.param.runjob.PyTorchRunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.param.yaml.YamlParseException;
-import org.apache.submarine.client.cli.runjob.RunJobCli;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.submarine.commons.runtime.exception.SubmarineRuntimeException;
-import org.apache.submarine.commons.runtime.resource.ResourceUtils;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.submarine.client.cli.runjob.RunJobCliParsingCommonTest;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import com.google.common.collect.ImmutableList;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Test class that verifies the correctness of PyTorch
- * YAML configuration parsing.
- */
-public class RunJobCliParsingPyTorchYamlTest {
- private static final String OVERRIDDEN_PREFIX = "overridden_";
- private static final String DIR_NAME = "runjob-pytorch-yaml";
- private File yamlConfig;
- private static Logger LOG = LoggerFactory.getLogger(
- RunJobCliParsingPyTorchYamlTest.class);
-
- @Before
- public void before() {
- SubmarineLogs.verboseOff();
- }
-
- @After
- public void after() {
- YamlConfigTestUtils.deleteFile(yamlConfig);
- }
-
- @Rule
- public ExpectedException exception = ExpectedException.none();
-
- private void verifyBasicConfigValues(RunJobParameters jobRunParameters) {
- verifyBasicConfigValues(jobRunParameters,
- ImmutableList.of("env1=env1Value", "env2=env2Value"));
- }
-
- private void verifyBasicConfigValues(RunJobParameters jobRunParameters,
- List<String> expectedEnvs) {
- assertEquals("testInputPath", jobRunParameters.getInputPath());
- assertEquals("testCheckpointPath", jobRunParameters.getCheckpointPath());
- Assert.assertEquals("testDockerImage", jobRunParameters.getDockerImageName());
-
- assertNotNull(jobRunParameters.getLocalizations());
- assertEquals(2, jobRunParameters.getLocalizations().size());
-
- assertNotNull(jobRunParameters.getQuicklinks());
- assertEquals(2, jobRunParameters.getQuicklinks().size());
-
- assertTrue(SubmarineLogs.isVerbose());
- assertTrue(jobRunParameters.isWaitJobFinish());
-
- for (String env : expectedEnvs) {
- assertTrue(String.format(
- "%s should be in env list of jobRunParameters!", env),
- jobRunParameters.getEnvars().contains(env));
- }
- }
-
- private PyTorchRunJobParameters verifyWorkerCommonValues(RunJobParameters
- jobRunParameters, String prefix) {
- assertTrue(RunJobParameters.class + " must be an instance of " +
- PyTorchRunJobParameters.class,
- jobRunParameters instanceof PyTorchRunJobParameters);
- PyTorchRunJobParameters pyTorchParams =
- (PyTorchRunJobParameters) jobRunParameters;
-
- assertEquals(3, pyTorchParams.getNumWorkers());
- assertEquals(prefix + "testLaunchCmdWorker",
- pyTorchParams.getWorkerLaunchCmd());
- assertEquals(prefix + "testDockerImageWorker",
- pyTorchParams.getWorkerDockerImage());
- return pyTorchParams;
- }
-
- private void verifyWorkerValues(RunJobParameters jobRunParameters,
- String prefix) {
- PyTorchRunJobParameters pyTorchParams = verifyWorkerCommonValues
- (jobRunParameters, prefix);
- assertEquals(Resources.createResource(20480, 32),
- pyTorchParams.getWorkerResource());
- }
-
- private void verifyWorkerValuesWithGpu(RunJobParameters jobRunParameters,
- String prefix) {
-
- PyTorchRunJobParameters pyTorchParams = verifyWorkerCommonValues
- (jobRunParameters, prefix);
- Resource workResource = Resources.createResource(20480, 32);
- ResourceUtils.setResource(workResource, ResourceUtils.GPU_URI, 2);
- assertEquals(workResource, pyTorchParams.getWorkerResource());
- }
-
- private void verifySecurityValues(RunJobParameters jobRunParameters) {
- assertEquals("keytabPath", jobRunParameters.getKeytab());
- assertEquals("testPrincipal", jobRunParameters.getPrincipal());
- assertTrue(jobRunParameters.isDistributeKeytab());
- }
-
- @Test
- public void testValidYamlParsing() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- Assert.assertFalse(SubmarineLogs.isVerbose());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/valid-config.yaml");
- runJobCli.run(
- new String[] {"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- verifyBasicConfigValues(jobRunParameters);
- verifyWorkerValues(jobRunParameters, "");
- verifySecurityValues(jobRunParameters);
- }
-
- @Test
- public void testValidGpuYamlParsing() throws Exception {
- try {
- ResourceUtils.configureResourceType(ResourceUtils.GPU_URI);
- } catch (SubmarineRuntimeException e) {
- LOG.info("The hadoop dependency doesn't support gpu resource, " +
- "so just skip this test case.");
- return;
- }
-
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- Assert.assertFalse(SubmarineLogs.isVerbose());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/valid-gpu-config.yaml");
- runJobCli.run(
- new String[] {"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- verifyBasicConfigValues(jobRunParameters);
- verifyWorkerValuesWithGpu(jobRunParameters, "");
- verifySecurityValues(jobRunParameters);
- }
-
- @Test
- public void testRoleOverrides() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- Assert.assertFalse(SubmarineLogs.isVerbose());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/valid-config-with-overrides.yaml");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- verifyBasicConfigValues(jobRunParameters);
- verifyWorkerValues(jobRunParameters, OVERRIDDEN_PREFIX);
- verifySecurityValues(jobRunParameters);
- }
-
- @Test
- public void testMissingPrincipalUnderSecuritySection() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/security-principal-is-missing.yaml");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- verifyBasicConfigValues(jobRunParameters);
- verifyWorkerValues(jobRunParameters, "");
-
- //Verify security values
- assertEquals("keytabPath", jobRunParameters.getKeytab());
- assertNull("Principal should be null!", jobRunParameters.getPrincipal());
- assertTrue(jobRunParameters.isDistributeKeytab());
- }
-
- @Test
- public void testMissingEnvs() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/envs-are-missing.yaml");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- verifyBasicConfigValues(jobRunParameters, ImmutableList.of());
- verifyWorkerValues(jobRunParameters, "");
- verifySecurityValues(jobRunParameters);
- }
-
- @Test
- public void testInvalidConfigPsSectionIsDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
- exception.expect(YamlParseException.class);
- exception.expectMessage("PS section should not be defined " +
- "when PyTorch is the selected framework");
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/invalid-config-ps-section.yaml");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
- }
-
- @Test
- public void testInvalidConfigTensorboardSectionIsDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
- exception.expect(YamlParseException.class);
- exception.expectMessage("TensorBoard section should not be defined " +
- "when TensorFlow is not the selected framework!");
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/invalid-config-tensorboard-section.yaml");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
- }
-
- @Test
- public void testInvalidConfigSchedulerSectionIsDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
- exception.expect(YamlParseException.class);
- exception.expectMessage("Scheduler section should not be defined " +
- "when MXNet is not the selected framework!");
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/invalid-config-scheduler-section.yaml");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
- }
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/tensorflow/RunJobCliParsingTensorFlowTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/tensorflow/RunJobCliParsingTensorFlowTest.java
deleted file mode 100644
index 1dfaf42..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/tensorflow/RunJobCliParsingTensorFlowTest.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob.tensorflow;
-
-import org.apache.commons.cli.ParseException;
-import org.apache.submarine.client.cli.CliConstants;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
-import org.apache.submarine.client.cli.runjob.RunJobCli;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.submarine.client.cli.runjob.RunJobCliParsingCommonTest;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test class that verifies the correctness of TensorFlow
- * CLI configuration parsing.
- */
-public class RunJobCliParsingTensorFlowTest {
-
- @Before
- public void before() {
- SubmarineLogs.verboseOff();
- }
-
- @Rule
- public ExpectedException expectedException = ExpectedException.none();
-
- @Test
- public void testNoInputPathOptionSpecified() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- String expectedErrorMessage = "\"--" + CliConstants.INPUT_PATH +
- "\" is absent";
- String actualMessage = "";
- try {
- runJobCli.run(
- new String[]{"--framework", "tensorflow",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--checkpoint_path", "hdfs://output",
- "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
- "--worker_resources", "memory=4g,vcores=2", "--tensorboard",
- "true", "--verbose", "--wait_job_finish"});
- } catch (ParseException e) {
- actualMessage = e.getMessage();
- e.printStackTrace();
- }
- assertEquals(expectedErrorMessage, actualMessage);
- }
-
- @Test
- public void testBasicRunJobForDistributedTraining() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
- assertFalse(SubmarineLogs.isVerbose());
-
- runJobCli.run(
- new String[] { "--framework", "tensorflow",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input",
- "--checkpoint_path", "hdfs://output",
- "--num_workers", "3", "--num_ps", "2", "--worker_launch_cmd",
- "python run-job.py", "--worker_resources", "memory=2048M,vcores=2",
- "--ps_resources", "memory=4G,vcores=4", "--tensorboard", "true",
- "--ps_launch_cmd", "python run-ps.py", "--keytab", "/keytab/path",
- "--principal", "user/_HOST@domain.com", "--distribute_keytab",
- "--verbose" });
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- assertTrue(RunJobParameters.class +
- " must be an instance of " +
- TensorFlowRunJobParameters.class,
- jobRunParameters instanceof TensorFlowRunJobParameters);
- TensorFlowRunJobParameters tensorFlowParams =
- (TensorFlowRunJobParameters) jobRunParameters;
-
- assertEquals(jobRunParameters.getInputPath(), "hdfs://input");
- assertEquals(jobRunParameters.getCheckpointPath(), "hdfs://output");
- assertEquals(tensorFlowParams.getNumPS(), 2);
- assertEquals(tensorFlowParams.getPSLaunchCmd(), "python run-ps.py");
- assertEquals(Resources.createResource(4096, 4),
- tensorFlowParams.getPsResource());
- assertEquals(tensorFlowParams.getWorkerLaunchCmd(),
- "python run-job.py");
- assertEquals(Resources.createResource(2048, 2),
- tensorFlowParams.getWorkerResource());
- assertEquals(jobRunParameters.getDockerImageName(),
- "tf-docker:1.1.0");
- assertEquals(jobRunParameters.getKeytab(),
- "/keytab/path");
- assertEquals(jobRunParameters.getPrincipal(),
- "user/_HOST@domain.com");
- assertTrue(jobRunParameters.isDistributeKeytab());
- assertTrue(SubmarineLogs.isVerbose());
- }
-
- @Test
- public void testBasicRunJobForSingleNodeTraining() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- runJobCli.run(
- new String[] { "--framework", "tensorflow",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input", "--checkpoint_path",
- "hdfs://output",
- "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
- "--worker_resources", "memory=4g,vcores=2", "--tensorboard",
- "true", "--verbose", "--wait_job_finish" });
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- assertTrue(RunJobParameters.class +
- " must be an instance of " +
- TensorFlowRunJobParameters.class,
- jobRunParameters instanceof TensorFlowRunJobParameters);
- TensorFlowRunJobParameters tensorFlowParams =
- (TensorFlowRunJobParameters) jobRunParameters;
-
- assertEquals(jobRunParameters.getInputPath(), "hdfs://input");
- assertEquals(jobRunParameters.getCheckpointPath(), "hdfs://output");
- assertEquals(tensorFlowParams.getNumWorkers(), 1);
- assertEquals(tensorFlowParams.getWorkerLaunchCmd(),
- "python run-job.py");
- assertEquals(Resources.createResource(4096, 2),
- tensorFlowParams.getWorkerResource());
- assertTrue(SubmarineLogs.isVerbose());
- assertTrue(jobRunParameters.isWaitJobFinish());
- }
-
- /**
- * when only run tensorboard, input_path is not needed
- * */
- @Test
- public void testNoInputPathOptionButOnlyRunTensorboard() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- boolean success = true;
- try {
- runJobCli.run(
- new String[]{"--framework", "tensorflow",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--num_workers", "0", "--tensorboard", "--verbose",
- "--tensorboard_resources", "memory=2G,vcores=2",
- "--tensorboard_docker_image", "tb_docker_image:001"});
- } catch (ParseException e) {
- success = false;
- }
- assertTrue(success);
- }
-
- @Test
- public void testNumSchedulerCannotBeDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("cannot be defined for TensorFlow jobs");
- runJobCli.run(
- new String[] {"--framework", "tensorflow",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input", "--checkpoint_path", "hdfs://output",
- "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
- "--worker_resources", "memory=4g,vcores=2", "--tensorboard", "true",
- "--verbose", "--wait_job_finish", "--num_schedulers", "1"});
- }
-
- @Test
- public void testSchedulerResourcesCannotBeDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("cannot be defined for TensorFlow jobs");
- runJobCli.run(
- new String[] {"--framework", "tensorflow",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input", "--checkpoint_path", "hdfs://output",
- "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
- "--worker_resources", "memory=4g,vcores=2", "--tensorboard", "true",
- "--verbose", "--wait_job_finish",
- "--scheduler_resources", "memory=2048M,vcores=2"});
- }
-
- @Test
- public void testSchedulerDockerImageCannotBeDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("cannot be defined for TensorFlow jobs");
- runJobCli.run(
- new String[] {"--framework", "tensorflow",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input", "--checkpoint_path", "hdfs://output",
- "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
- "--worker_resources", "memory=4g,vcores=2", "--tensorboard", "true",
- "--verbose", "--wait_job_finish",
- "--scheduler_docker_image", "schedulerDockerImage"});
- }
-
- @Test
- public void testSchedulerLaunchCommandCannotBeDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- assertFalse(SubmarineLogs.isVerbose());
-
- expectedException.expect(ParseException.class);
- expectedException.expectMessage("cannot be defined for TensorFlow jobs");
- runJobCli.run(
- new String[] {"--framework", "tensorflow",
- "--name", "my-job", "--docker_image", "tf-docker:1.1.0",
- "--input_path", "hdfs://input", "--checkpoint_path", "hdfs://output",
- "--num_workers", "1", "--worker_launch_cmd", "python run-job.py",
- "--worker_resources", "memory=4g,vcores=2", "--tensorboard", "true",
- "--verbose", "--wait_job_finish",
- "--scheduler_launch_cmd", "schedulerLaunchCommand"});
- }
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/tensorflow/RunJobCliParsingTensorFlowYamlStandaloneTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/tensorflow/RunJobCliParsingTensorFlowYamlStandaloneTest.java
deleted file mode 100644
index 4fb4b3e..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/tensorflow/RunJobCliParsingTensorFlowYamlStandaloneTest.java
+++ /dev/null
@@ -1,206 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob.tensorflow;
-
-import org.apache.submarine.client.cli.param.yaml.Configs;
-import org.apache.submarine.client.cli.param.yaml.Role;
-import org.apache.submarine.client.cli.param.yaml.Roles;
-import org.apache.submarine.client.cli.param.yaml.Scheduling;
-import org.apache.submarine.client.cli.param.yaml.Security;
-import org.apache.submarine.client.cli.param.yaml.Spec;
-import org.apache.submarine.client.cli.param.yaml.TensorBoard;
-import org.apache.submarine.client.cli.param.yaml.YamlConfigFile;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.submarine.client.cli.YamlConfigTestUtils;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.util.List;
-import java.util.Map;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test class that verifies the correctness of YAML configuration parsing.
- * Please note that this class just tests YAML parsing,
- * but only in an isolated fashion.
- */
-public class RunJobCliParsingTensorFlowYamlStandaloneTest {
- private static final String OVERRIDDEN_PREFIX = "overridden_";
- private static final String DIR_NAME = "runjob-tensorflow-yaml";
-
- private void verifyBasicConfigValues(YamlConfigFile yamlConfigFile) {
- assertNotNull("Spec file should not be null!", yamlConfigFile);
- Spec spec = yamlConfigFile.getSpec();
- assertNotNull("Spec should not be null!", spec);
-
- assertEquals("testJobName", spec.getName());
- assertEquals("testJobType", spec.getJobType());
-
- Configs configs = yamlConfigFile.getConfigs();
- assertNotNull("Configs should not be null!", configs);
-
- assertEquals("testInputPath", configs.getInputPath());
- assertEquals("testCheckpointPath", configs.getCheckpointPath());
- assertEquals("testSavedModelPath", configs.getSavedModelPath());
- assertEquals("testDockerImage", configs.getDockerImage());
-
- Map<String, String> envs = configs.getEnvs();
- assertNotNull("Envs should not be null!", envs);
- assertEquals(2, envs.size());
- assertEquals("env1Value", envs.get("env1"));
- assertEquals("env2Value", envs.get("env2"));
-
- List<String> localizations = configs.getLocalizations();
- assertNotNull("Localizations should not be null!", localizations);
- assertEquals("Size of localizations must be 2!", 2, localizations.size());
- assertEquals("hdfs://remote-file1:/local-filename1:rw",
- localizations.get(0));
- assertEquals("nfs://remote-file2:/local-filename2:rw",
- localizations.get(1));
-
- List<String> mounts = configs.getMounts();
- assertNotNull("Mounts should not be null!", mounts);
- assertEquals("Size of mounts must be 2!", 2, mounts.size());
- assertEquals("/etc/passwd:/etc/passwd:rw", mounts.get(0));
- assertEquals("/etc/hosts:/etc/hosts:rw", mounts.get(1));
-
- assertTrue(
- configs.getQuicklinks().contains("Notebook_UI=https://master-0:7070"));
- assertTrue(
- configs.getQuicklinks().contains("Notebook_UI2=https://master-0:7071"));
- assertEquals("true", configs.getWaitJobFinish());
- }
-
- private void assertRoleConfigOverrides(Role role, String prefix,
- String roleType) {
- assertNotNull(roleType + " role should not be null!", role);
-
- assertEquals(String.format("%stestDockerImage%s", prefix, roleType),
- role.getDockerImage());
-
- //envs, localizations and mounts for Roles
- // are only present in valid-config-with-overrides.yaml
- boolean validateAll = !prefix.equals("");
- if (validateAll) {
- Map<String, String> envs = role.getEnvs();
- assertNotNull("Envs should not be null!", envs);
- assertEquals(String.format("%senv1%s", prefix, roleType),
- envs.get("env1"));
- assertEquals(String.format("%senv2%s", prefix, roleType),
- envs.get("env2"));
- }
-
- if (validateAll) {
- List<String> localizations = role.getLocalizations();
- assertNotNull("Localizations should not be null!", localizations);
- assertEquals("Size of localizations must be 2!", 2, localizations.size());
- assertEquals(String.format("hdfs://remote-file1:/%slocal" +
- "-filename1%s:rw", prefix, roleType), localizations.get(0));
- assertEquals(String.format("nfs://remote-file2:/%slocal" +
- "-filename2%s:rw", prefix, roleType), localizations.get(1));
- }
-
- if (validateAll) {
- List<String> mounts = role.getMounts();
- assertNotNull("Mounts should not be null!", mounts);
- assertEquals("Size of mounts must be 2!", 2, mounts.size());
- assertEquals(String.format("/etc/passwd:/%s%s", prefix, roleType),
- mounts.get(0));
- assertEquals(String.format("/etc/hosts:/%s%s", prefix, roleType),
- mounts.get(1));
- }
- }
-
- private void assertWorkerValues(Role worker) {
- assertEquals("testLaunchCmdWorker", worker.getLaunchCmd());
- assertEquals("testDockerImageWorker", worker.getDockerImage());
- assertEquals("memory=20480M,vcores=32", worker.getResources());
- assertEquals(3, worker.getReplicas());
- }
-
- private void assertPsValues(Role ps) {
- assertEquals("testLaunchCmdPs", ps.getLaunchCmd());
- assertEquals("testDockerImagePs", ps.getDockerImage());
- assertEquals("memory=20500M,vcores=34", ps.getResources());
- assertEquals(4, ps.getReplicas());
- }
-
- private void verifySchedulingValues(YamlConfigFile yamlConfigFile) {
- Scheduling scheduling = yamlConfigFile.getScheduling();
- assertNotNull("Scheduling should not be null!", scheduling);
- assertEquals("queue1", scheduling.getQueue());
- }
-
- private void verifySecurityValues(YamlConfigFile yamlConfigFile) {
- Security security = yamlConfigFile.getSecurity();
- assertNotNull("Security should not be null!", security);
- assertEquals("keytabPath", security.getKeytab());
- assertEquals("testPrincipal", security.getPrincipal());
- assertTrue(security.isDistributeKeytab());
- }
-
- private void verifyTensorboardValues(YamlConfigFile yamlConfigFile) {
- TensorBoard tensorBoard = yamlConfigFile.getTensorBoard();
- assertNotNull("Tensorboard should not be null!", tensorBoard);
- assertEquals("tensorboardDockerImage", tensorBoard.getDockerImage());
- assertEquals("memory=21000M,vcores=37", tensorBoard.getResources());
- }
-
- @Before
- public void before() {
- SubmarineLogs.verboseOff();
- }
-
- @Test
- public void testLaunchCommandYaml() {
- YamlConfigFile yamlConfigFile = YamlConfigTestUtils.readYamlConfigFile(DIR_NAME +
- "/valid-config.yaml");
-
- verifyBasicConfigValues(yamlConfigFile);
-
- Roles roles = yamlConfigFile.getRoles();
- assertNotNull("Roles should not be null!", roles);
- assertRoleConfigOverrides(roles.getWorker(), "", "Worker");
- assertRoleConfigOverrides(roles.getPs(), "", "Ps");
-
- assertWorkerValues(roles.getWorker());
- assertPsValues(roles.getPs());
-
- verifySchedulingValues(yamlConfigFile);
- verifySecurityValues(yamlConfigFile);
- verifyTensorboardValues(yamlConfigFile);
- }
-
- @Test
- public void testOverrides() {
- YamlConfigFile yamlConfigFile = YamlConfigTestUtils.readYamlConfigFile(DIR_NAME +
- "/valid-config-with-overrides.yaml");
-
- verifyBasicConfigValues(yamlConfigFile);
-
- Roles roles = yamlConfigFile.getRoles();
- assertNotNull("Roles should not be null!", roles);
- assertRoleConfigOverrides(roles.getWorker(), OVERRIDDEN_PREFIX, "Worker");
- assertRoleConfigOverrides(roles.getPs(), OVERRIDDEN_PREFIX, "Ps");
- }
-}
diff --git a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/tensorflow/RunJobCliParsingTensorFlowYamlTest.java b/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/tensorflow/RunJobCliParsingTensorFlowYamlTest.java
deleted file mode 100644
index 894fc08..0000000
--- a/submarine-client/src/test/java/org/apache/submarine/client/cli/runjob/tensorflow/RunJobCliParsingTensorFlowYamlTest.java
+++ /dev/null
@@ -1,308 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.client.cli.runjob.tensorflow;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.submarine.client.cli.YamlConfigTestUtils;
-import org.apache.submarine.client.cli.param.runjob.RunJobParameters;
-import org.apache.submarine.client.cli.param.runjob.TensorFlowRunJobParameters;
-import org.apache.submarine.client.cli.param.yaml.YamlParseException;
-import org.apache.submarine.client.cli.runjob.RunJobCli;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.submarine.commons.runtime.exception.SubmarineRuntimeException;
-import org.apache.submarine.commons.runtime.resource.ResourceUtils;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.submarine.client.cli.runjob.RunJobCliParsingCommonTest;
-import org.junit.After;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Rule;
-import org.junit.Test;
-import org.junit.rules.ExpectedException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.File;
-import java.util.List;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
-
-/**
- * Test class that verifies the correctness of TF YAML configuration parsing.
- */
-public class RunJobCliParsingTensorFlowYamlTest {
- private static final String OVERRIDDEN_PREFIX = "overridden_";
- private static final String DIR_NAME = "runjob-tensorflow-yaml";
- private File yamlConfig;
- private static Logger LOG = LoggerFactory.getLogger(
- RunJobCliParsingTensorFlowYamlTest.class);
-
- @Before
- public void before() {
- SubmarineLogs.verboseOff();
- }
-
- @After
- public void after() {
- YamlConfigTestUtils.deleteFile(yamlConfig);
- }
-
- @Rule
- public ExpectedException exception = ExpectedException.none();
-
- private void verifyBasicConfigValues(RunJobParameters jobRunParameters) {
- verifyBasicConfigValues(jobRunParameters,
- ImmutableList.of("env1=env1Value", "env2=env2Value"));
- }
-
- private void verifyBasicConfigValues(RunJobParameters jobRunParameters,
- List<String> expectedEnvs) {
- assertEquals("testInputPath", jobRunParameters.getInputPath());
- assertEquals("testCheckpointPath", jobRunParameters.getCheckpointPath());
- assertEquals("testDockerImage", jobRunParameters.getDockerImageName());
-
- assertNotNull(jobRunParameters.getLocalizations());
- assertEquals(2, jobRunParameters.getLocalizations().size());
-
- assertNotNull(jobRunParameters.getQuicklinks());
- assertEquals(2, jobRunParameters.getQuicklinks().size());
-
- assertTrue(SubmarineLogs.isVerbose());
- assertTrue(jobRunParameters.isWaitJobFinish());
-
- for (String env : expectedEnvs) {
- assertTrue(String.format(
- "%s should be in env list of jobRunParameters!", env),
- jobRunParameters.getEnvars().contains(env));
- }
- }
-
- private void verifyPsValues(RunJobParameters jobRunParameters,
- String prefix) {
- assertTrue(RunJobParameters.class + " must be an instance of " +
- TensorFlowRunJobParameters.class,
- jobRunParameters instanceof TensorFlowRunJobParameters);
- TensorFlowRunJobParameters tensorFlowParams =
- (TensorFlowRunJobParameters) jobRunParameters;
-
- assertEquals(4, tensorFlowParams.getNumPS());
- assertEquals(prefix + "testLaunchCmdPs", tensorFlowParams.getPSLaunchCmd());
- assertEquals(prefix + "testDockerImagePs",
- tensorFlowParams.getPsDockerImage());
- assertEquals(Resources.createResource(20500, 34),
- tensorFlowParams.getPsResource());
- }
-
- private TensorFlowRunJobParameters verifyWorkerCommonValues(
- RunJobParameters jobRunParameters, String prefix) {
- assertTrue(RunJobParameters.class + " must be an instance of " +
- TensorFlowRunJobParameters.class,
- jobRunParameters instanceof TensorFlowRunJobParameters);
- TensorFlowRunJobParameters tensorFlowParams =
- (TensorFlowRunJobParameters) jobRunParameters;
-
- assertEquals(3, tensorFlowParams.getNumWorkers());
- assertEquals(prefix + "testLaunchCmdWorker",
- tensorFlowParams.getWorkerLaunchCmd());
- assertEquals(prefix + "testDockerImageWorker",
- tensorFlowParams.getWorkerDockerImage());
- return tensorFlowParams;
- }
-
- private void verifyWorkerValues(RunJobParameters jobRunParameters,
- String prefix) {
- TensorFlowRunJobParameters tensorFlowParams = verifyWorkerCommonValues
- (jobRunParameters, prefix);
- assertEquals(Resources.createResource(20480, 32),
- tensorFlowParams.getWorkerResource());
- }
-
- private void verifyWorkerValuesWithGpu(RunJobParameters jobRunParameters,
- String prefix) {
- TensorFlowRunJobParameters tensorFlowParams = verifyWorkerCommonValues
- (jobRunParameters, prefix);
- Resource workResource = Resources.createResource(20480, 32);
- ResourceUtils.setResource(workResource, ResourceUtils.GPU_URI, 2);
- assertEquals(workResource, tensorFlowParams.getWorkerResource());
- }
-
- private void verifySecurityValues(RunJobParameters jobRunParameters) {
- assertEquals("keytabPath", jobRunParameters.getKeytab());
- assertEquals("testPrincipal", jobRunParameters.getPrincipal());
- assertTrue(jobRunParameters.isDistributeKeytab());
- }
-
- private void verifyTensorboardValues(RunJobParameters jobRunParameters) {
- assertTrue(RunJobParameters.class + " must be an instance of " +
- TensorFlowRunJobParameters.class,
- jobRunParameters instanceof TensorFlowRunJobParameters);
- TensorFlowRunJobParameters tensorFlowParams =
- (TensorFlowRunJobParameters) jobRunParameters;
-
- assertTrue(tensorFlowParams.isTensorboardEnabled());
- assertEquals("tensorboardDockerImage",
- tensorFlowParams.getTensorboardDockerImage());
- assertEquals(Resources.createResource(21000, 37),
- tensorFlowParams.getTensorboardResource());
- }
-
- @Test
- public void testValidYamlParsing() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- Assert.assertFalse(SubmarineLogs.isVerbose());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/valid-config.yaml");
- runJobCli.run(
- new String[] {"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- verifyBasicConfigValues(jobRunParameters);
- verifyPsValues(jobRunParameters, "");
- verifyWorkerValues(jobRunParameters, "");
- verifySecurityValues(jobRunParameters);
- verifyTensorboardValues(jobRunParameters);
- }
-
- @Test
- public void testValidGpuYamlParsing() throws Exception {
- try {
- ResourceUtils.configureResourceType(ResourceUtils.GPU_URI);
- } catch (SubmarineRuntimeException e) {
- LOG.info("The hadoop dependency doesn't support gpu resource, " +
- "so just skip this test case.");
- return;
- }
-
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- Assert.assertFalse(SubmarineLogs.isVerbose());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/valid-gpu-config.yaml");
- runJobCli.run(
- new String[] {"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- verifyBasicConfigValues(jobRunParameters);
- verifyPsValues(jobRunParameters, "");
- verifyWorkerValuesWithGpu(jobRunParameters, "");
- verifySecurityValues(jobRunParameters);
- verifyTensorboardValues(jobRunParameters);
- }
-
- @Test
- public void testRoleOverrides() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
- Assert.assertFalse(SubmarineLogs.isVerbose());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/valid-config-with-overrides.yaml");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- verifyBasicConfigValues(jobRunParameters);
- verifyPsValues(jobRunParameters, OVERRIDDEN_PREFIX);
- verifyWorkerValues(jobRunParameters, OVERRIDDEN_PREFIX);
- verifySecurityValues(jobRunParameters);
- verifyTensorboardValues(jobRunParameters);
- }
-
- @Test
- public void testMissingPrincipalUnderSecuritySection() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/security-principal-is-missing.yaml");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- verifyBasicConfigValues(jobRunParameters);
- verifyPsValues(jobRunParameters, "");
- verifyWorkerValues(jobRunParameters, "");
- verifyTensorboardValues(jobRunParameters);
-
- //Verify security values
- assertEquals("keytabPath", jobRunParameters.getKeytab());
- assertNull("Principal should be null!", jobRunParameters.getPrincipal());
- assertTrue(jobRunParameters.isDistributeKeytab());
- }
-
- @Test
- public void testMissingTensorBoardDockerImage() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/tensorboard-dockerimage-is-missing.yaml");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
-
- verifyBasicConfigValues(jobRunParameters);
- verifyPsValues(jobRunParameters, "");
- verifyWorkerValues(jobRunParameters, "");
- verifySecurityValues(jobRunParameters);
-
- TensorFlowRunJobParameters tensorFlowParams =
- (TensorFlowRunJobParameters) jobRunParameters;
-
- assertTrue(tensorFlowParams.isTensorboardEnabled());
- assertNull("tensorboardDockerImage should be null!",
- tensorFlowParams.getTensorboardDockerImage());
- assertEquals(Resources.createResource(21000, 37),
- tensorFlowParams.getTensorboardResource());
- }
-
- @Test
- public void testMissingEnvs() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/envs-are-missing.yaml");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
-
- RunJobParameters jobRunParameters = runJobCli.getRunJobParameters();
- verifyBasicConfigValues(jobRunParameters, ImmutableList.of());
- verifyPsValues(jobRunParameters, "");
- verifyWorkerValues(jobRunParameters, "");
- verifySecurityValues(jobRunParameters);
- verifyTensorboardValues(jobRunParameters);
- }
-
- @Test
- public void testInvalidConfigSchedulerSectionIsDefined() throws Exception {
- RunJobCli runJobCli = new RunJobCli(RunJobCliParsingCommonTest.getMockClientContext());
-
- exception.expect(YamlParseException.class);
- exception.expectMessage("Scheduler section should not be defined " +
- "when MXNet is not the selected framework!");
- yamlConfig = YamlConfigTestUtils.createTempFileWithContents(
- DIR_NAME + "/invalid-config-scheduler-section.yaml");
- runJobCli.run(
- new String[]{"-f", yamlConfig.getAbsolutePath(), "--verbose"});
- }
-}
diff --git a/submarine-commons/commons-runtime/pom.xml b/submarine-commons/commons-runtime/pom.xml
index f8ab7dd..e17cd7c 100644
--- a/submarine-commons/commons-runtime/pom.xml
+++ b/submarine-commons/commons-runtime/pom.xml
@@ -76,121 +76,6 @@
<artifactId>jackson-core</artifactId>
</dependency>
- <!-- Dependencies for Hadoop -->
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-common</artifactId>
- <version>${hadoop.version}</version>
- <scope>provided</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpcore</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-lang</groupId>
- <artifactId>commons-lang</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-collections</groupId>
- <artifactId>commons-collections</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.guava</groupId>
- <artifactId>guava</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-api</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <groupId>com.google.code.findbugs</groupId>
- <artifactId>jsr305</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-client</artifactId>
- <version>${hadoop.version}</version>
- <exclusions>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-jaxrs</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-xc</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-mapper-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.codehaus.jackson</groupId>
- <artifactId>jackson-core-asl</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-log4j12</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.commons</groupId>
- <artifactId>commons-lang3</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-codec</groupId>
- <artifactId>commons-codec</artifactId>
- </exclusion>
- <exclusion>
- <groupId>commons-logging</groupId>
- <artifactId>commons-logging</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
</dependencies>
<build>
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/ClientContext.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/ClientContext.java
deleted file mode 100644
index a2a07c1..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/ClientContext.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime;
-
-import org.apache.submarine.commons.utils.SubmarineConfiguration;
-import org.apache.submarine.commons.runtime.fs.DefaultRemoteDirectoryManager;
-import org.apache.submarine.commons.runtime.fs.RemoteDirectoryManager;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.client.api.YarnClient;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-
-public class ClientContext {
- private Configuration yarnConf = new YarnConfiguration();
-
- private volatile RemoteDirectoryManager remoteDirectoryManager;
- private YarnClient yarnClient;
- private SubmarineConfiguration submarineConfig;
- private RuntimeFactory runtimeFactory;
-
- public ClientContext() {
- submarineConfig = SubmarineConfiguration.getInstance();
- }
-
- public synchronized YarnClient getOrCreateYarnClient() {
- if (yarnClient == null) {
- yarnClient = YarnClient.createYarnClient();
- yarnClient.init(yarnConf);
- yarnClient.start();
- }
- return yarnClient;
- }
-
- public Configuration getYarnConfig() {
- return yarnConf;
- }
-
- public void setYarnConfig(Configuration conf) {
- this.yarnConf = conf;
- }
-
- public RemoteDirectoryManager getRemoteDirectoryManager() {
- if (remoteDirectoryManager == null) {
- synchronized (this) {
- if (remoteDirectoryManager == null) {
- remoteDirectoryManager = new DefaultRemoteDirectoryManager(this);
- }
- }
- }
- return remoteDirectoryManager;
- }
-
- public SubmarineConfiguration getSubmarineConfig() {
- return submarineConfig;
- }
-
- public void setSubmarineConfig(SubmarineConfiguration submarineConfig) {
- this.submarineConfig = submarineConfig;
- }
-
- public RuntimeFactory getRuntimeFactory() {
- return runtimeFactory;
- }
-
- public void setRuntimeFactory(RuntimeFactory runtimeFactory) {
- this.runtimeFactory = runtimeFactory;
- }
-}
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/JobMonitor.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/JobMonitor.java
deleted file mode 100644
index ca6bfc6..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/JobMonitor.java
+++ /dev/null
@@ -1,94 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime;
-
-import org.apache.submarine.commons.runtime.api.JobState;
-import org.apache.submarine.commons.runtime.api.JobStatus;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.commons.runtime.exception.SubmarineException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.IOException;
-
-/**
- * Monitor status of job(s)
- */
-public abstract class JobMonitor {
- private static final Logger LOG =
- LoggerFactory.getLogger(JobMonitor.class);
- protected ClientContext clientContext;
-
- public JobMonitor(ClientContext clientContext) {
- this.clientContext = clientContext;
- }
-
- /**
- * Returns status of training job.
- *
- * @param jobName name of job
- * @return job status
- * @throws IOException anything else happens
- * @throws YarnException anything related to YARN happens
- */
- public abstract JobStatus getTrainingJobStatus(String jobName)
- throws IOException, YarnException;
-
- /**
- * Cleanup AppAdminClient, etc.
- */
- public void cleanup() throws IOException {}
-
- /**
- * Continue wait and print status if job goes to ready or final state.
- * @param jobName
- * @throws IOException
- * @throws YarnException
- * @throws SubmarineException
- */
- public void waitTrainingFinal(String jobName)
- throws IOException, YarnException, SubmarineException {
- // Wait 5 sec between each fetch.
- int waitIntervalSec = 5;
- JobStatus js;
- while (true) {
- js = getTrainingJobStatus(jobName);
- JobState jobState = js.getState();
- js.nicePrint(System.err);
-
- if (JobState.isFinal(jobState)) {
- if (jobState.equals(JobState.FAILED)) {
- throw new SubmarineException("Job failed");
- } else if (jobState.equals(JobState.KILLED)) {
- throw new SubmarineException("Job killed");
- }
- LOG.info("Job exited with state=" + jobState);
- break;
- }
-
- try {
- Thread.sleep(waitIntervalSec * 1000);
- } catch (InterruptedException e) {
- throw new IOException(e);
- }
- }
- cleanup();
- }
-}
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/JobSubmitter.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/JobSubmitter.java
deleted file mode 100644
index a7d23ca..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/JobSubmitter.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime;
-
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.commons.runtime.exception.SubmarineException;
-import org.apache.submarine.commons.runtime.param.Parameter;
-
-import java.io.IOException;
-
-/**
- * Submit job to cluster master.
- */
-public interface JobSubmitter {
- /**
- * Submit a job to cluster.
- * @param parameters run job parameters
- * @return applicationId when successfully submitted
- * @throws YarnException for issues while contacting YARN daemons
- * @throws IOException for other issues.
- */
- ApplicationId submitJob(Parameter parameters)
- throws IOException, YarnException, SubmarineException;
-}
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/RuntimeFactory.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/RuntimeFactory.java
deleted file mode 100644
index f6a65ad..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/RuntimeFactory.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.submarine.commons.utils.SubmarineConfiguration;
-import org.apache.submarine.commons.utils.SubmarineConfVars;
-import org.apache.submarine.commons.runtime.exception.SubmarineRuntimeException;
-import org.apache.submarine.commons.runtime.fs.SubmarineStorage;
-
-import java.lang.reflect.InvocationTargetException;
-
-public abstract class RuntimeFactory {
- protected ClientContext clientContext;
- private JobSubmitter jobSubmitter;
- private JobMonitor jobMonitor;
- private SubmarineStorage submarineStorage;
-
- public RuntimeFactory(ClientContext clientContext) {
- this.clientContext = clientContext;
- }
-
- public static RuntimeFactory getRuntimeFactory(
- ClientContext clientContext, ClassLoader classLoader) {
- SubmarineConfiguration submarineConfiguration =
- clientContext.getSubmarineConfig();
- String runtimeClass = submarineConfiguration.getString(
- SubmarineConfVars.ConfVars.SUBMARINE_RUNTIME_CLASS);
-
- try {
- Class<?> runtimeClazz = Class.forName(runtimeClass, true, classLoader);
- if (RuntimeFactory.class.isAssignableFrom(runtimeClazz)) {
- return (RuntimeFactory) runtimeClazz.getConstructor(ClientContext.class).newInstance(clientContext);
- } else {
- throw new SubmarineRuntimeException("Class: " + runtimeClass
- + " not instance of " + RuntimeFactory.class.getCanonicalName());
- }
- } catch (ClassNotFoundException | IllegalAccessException |
- InstantiationException | NoSuchMethodException |
- InvocationTargetException e) {
- throw new SubmarineRuntimeException(
- "Could not instantiate RuntimeFactory: " + runtimeClass, e);
- }
- }
-
- protected abstract JobSubmitter internalCreateJobSubmitter();
-
- protected abstract JobMonitor internalCreateJobMonitor();
-
- protected abstract SubmarineStorage internalCreateSubmarineStorage();
-
- public synchronized JobSubmitter getJobSubmitterInstance() {
- if (jobSubmitter == null) {
- jobSubmitter = internalCreateJobSubmitter();
- }
- return jobSubmitter;
- }
-
- public synchronized JobMonitor getJobMonitorInstance() {
- if (jobMonitor == null) {
- jobMonitor = internalCreateJobMonitor();
- }
- return jobMonitor;
- }
-
- public synchronized SubmarineStorage getSubmarineStorage() {
- if (submarineStorage == null) {
- submarineStorage = internalCreateSubmarineStorage();
- }
- return submarineStorage;
- }
-
- @VisibleForTesting
- public synchronized void setJobSubmitterInstance(JobSubmitter jobSubmitter) {
- this.jobSubmitter = jobSubmitter;
- }
-
- @VisibleForTesting
- public synchronized void setJobMonitorInstance(JobMonitor jobMonitor) {
- this.jobMonitor = jobMonitor;
- }
-
- @VisibleForTesting
- public synchronized void setSubmarineStorage(SubmarineStorage storage) {
- this.submarineStorage = storage;
- }
-}
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/fs/DefaultRemoteDirectoryManager.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/fs/DefaultRemoteDirectoryManager.java
deleted file mode 100644
index c7de978..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/fs/DefaultRemoteDirectoryManager.java
+++ /dev/null
@@ -1,168 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime.fs;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.submarine.commons.runtime.ClientContext;
-
-import java.io.File;
-import java.io.IOException;
-import java.net.URI;
-
-/**
- * Manages remote directories for staging, log, etc.
- * TODO(keqiu), need to properly handle permission / name validation, etc.
- */
-public class DefaultRemoteDirectoryManager implements RemoteDirectoryManager {
- private FileSystem fs;
- private Configuration conf;
-
- public DefaultRemoteDirectoryManager(ClientContext context) {
- this.conf = context.getYarnConfig();
- try {
- this.fs = FileSystem.get(context.getYarnConfig());
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- }
-
- @Override
- public Path getJobStagingArea(String jobName, boolean create)
- throws IOException {
- Path staging = new Path(getJobRootFolder(jobName), "staging");
- if (create) {
- createFolderIfNotExist(staging);
- }
-
- // Get a file status to make sure it is a absolute path.
- FileStatus fStatus = fs.getFileStatus(staging);
- return fStatus.getPath();
- }
-
- @Override
- public Path getJobCheckpointDir(String jobName, boolean create)
- throws IOException {
- Path jobDir = new Path(getJobStagingArea(jobName, create),
- "checkpoint_path");
- if (create) {
- createFolderIfNotExist(jobDir);
- }
- return jobDir;
- }
-
- @Override
- public Path getModelDir(String modelName, boolean create)
- throws IOException {
- Path modelDir = new Path(new Path("submarine", "models"), modelName);
- if (create) {
- createFolderIfNotExist(modelDir);
- }
- return modelDir;
- }
-
- @Override
- public FileSystem getDefaultFileSystem() {
- return fs;
- }
-
- @Override
- public FileSystem getFileSystemByUri(String uri) throws IOException {
- return FileSystem.get(URI.create(uri), conf);
- }
-
- @Override
- public Path getUserRootFolder() throws IOException {
- Path rootPath = new Path("submarine", "jobs");
- createFolderIfNotExist(rootPath);
- // Get a file status to make sure it is a absolute path.
- FileStatus fStatus = fs.getFileStatus(rootPath);
- return fStatus.getPath();
- }
-
- @Override
- public boolean isDir(String uri) throws IOException {
- if (isRemote(uri)) {
- return getFileSystemByUri(uri).getFileStatus(new Path(uri)).isDirectory();
- }
- return new File(uri).isDirectory();
- }
-
- @Override
- public boolean isRemote(String uri) {
- String scheme = new Path(uri).toUri().getScheme();
- if (null == scheme) {
- return false;
- }
- return !scheme.startsWith("file://");
- }
-
- @Override
- public boolean copyRemoteToLocal(String remoteUri, String localUri)
- throws IOException {
- // Delete old to avoid failure in FileUtil.copy
- File old = new File(localUri);
- if (old.exists()) {
- if (!FileUtil.fullyDelete(old)) {
- throw new IOException("Failed to delete dir:"
- + old.getAbsolutePath());
- }
- }
- return FileUtil.copy(getFileSystemByUri(remoteUri), new Path(remoteUri),
- new File(localUri), false,
- conf);
- }
-
- @Override
- public boolean existsRemoteFile(String uri) throws IOException {
- return getFileSystemByUri(uri).exists(new Path(uri));
- }
-
- @Override
- public FileStatus getRemoteFileStatus(String uri) throws IOException {
- return getFileSystemByUri(uri).getFileStatus(new Path(uri));
- }
-
- @Override
- public long getRemoteFileSize(String uri) throws IOException {
- return getFileSystemByUri(uri)
- .getContentSummary(new Path(uri)).getSpaceConsumed();
- }
-
- private Path getJobRootFolder(String jobName) throws IOException {
- Path userRoot = getUserRootFolder();
- Path jobRootPath = new Path(userRoot, jobName);
- createFolderIfNotExist(jobRootPath);
- // Get a file status to make sure it is a absolute path.
- FileStatus fStatus = fs.getFileStatus(jobRootPath);
- return fStatus.getPath();
- }
-
- private void createFolderIfNotExist(Path path) throws IOException {
- if (!fs.exists(path)) {
- if (!fs.mkdirs(path)) {
- throw new IOException("Failed to create folder=" + path);
- }
- }
- }
-}
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/fs/FSBasedSubmarineStorageImpl.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/fs/FSBasedSubmarineStorageImpl.java
deleted file mode 100644
index befb8c9..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/fs/FSBasedSubmarineStorageImpl.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime.fs;
-
-import org.apache.hadoop.fs.FSDataInputStream;
-import org.apache.hadoop.fs.FSDataOutputStream;
-import org.apache.hadoop.fs.Path;
-import org.apache.submarine.commons.runtime.ClientContext;
-
-import java.io.IOException;
-import java.io.ObjectInput;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutput;
-import java.io.ObjectOutputStream;
-import java.util.Map;
-
-/**
- * A super naive FS-based storage.
- */
-public class FSBasedSubmarineStorageImpl extends SubmarineStorage {
- RemoteDirectoryManager rdm;
-
- public FSBasedSubmarineStorageImpl(ClientContext clientContext) {
- rdm = clientContext.getRemoteDirectoryManager();
- }
-
- @Override
- public void addNewJob(String jobName, Map<String, String> jobInfo)
- throws IOException {
- Path jobInfoPath = getJobInfoPath(jobName, true);
- FSDataOutputStream fos = rdm.getDefaultFileSystem().create(jobInfoPath);
- serializeMap(fos, jobInfo);
- }
-
- @Override
- public Map<String, String> getJobInfoByName(String jobName)
- throws IOException {
- Path jobInfoPath = getJobInfoPath(jobName, false);
- FSDataInputStream fis = rdm.getDefaultFileSystem().open(jobInfoPath);
- return deserializeMap(fis);
- }
-
- @Override
- public void addNewModel(String modelName, String version,
- Map<String, String> modelInfo) throws IOException {
- Path modelInfoPath = getModelInfoPath(modelName, version, true);
- FSDataOutputStream fos = rdm.getDefaultFileSystem().create(modelInfoPath);
- serializeMap(fos, modelInfo);
- }
-
- @Override
- public Map<String, String> getModelInfoByName(String modelName,
- String version) throws IOException {
- Path modelInfoPath = getModelInfoPath(modelName, version, false);
- FSDataInputStream fis = rdm.getDefaultFileSystem().open(modelInfoPath);
- return deserializeMap(fis);
- }
-
- private Path getModelInfoPath(String modelName, String version, boolean create)
- throws IOException {
- Path modelDir = rdm.getModelDir(modelName, create);
- return new Path(modelDir, version + ".info");
- }
-
- private void serializeMap(FSDataOutputStream fos, Map<String, String> map)
- throws IOException {
- ObjectOutput oo = new ObjectOutputStream(fos);
- oo.writeObject(map);
- oo.close();
- }
-
- private Map<String, String> deserializeMap(FSDataInputStream fis)
- throws IOException {
- ObjectInput oi = new ObjectInputStream(fis);
- Map<String, String> newMap;
- try {
- newMap = (Map<String, String>) oi.readObject();
- } catch (ClassNotFoundException e) {
- throw new IOException(e);
- }
- return newMap;
- }
-
- private Path getJobInfoPath(String jobName, boolean create) throws IOException {
- Path path = rdm.getJobStagingArea(jobName, create);
- return new Path(path, "job.info");
- }
-}
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/fs/RemoteDirectoryManager.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/fs/RemoteDirectoryManager.java
deleted file mode 100644
index bac953a..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/fs/RemoteDirectoryManager.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime.fs;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-
-import java.io.IOException;
-
-public interface RemoteDirectoryManager {
- Path getJobStagingArea(String jobName, boolean create) throws IOException;
-
- Path getJobCheckpointDir(String jobName, boolean create) throws IOException;
-
- Path getModelDir(String modelName, boolean create) throws IOException;
-
- FileSystem getDefaultFileSystem() throws IOException;
-
- FileSystem getFileSystemByUri(String uri) throws IOException;
-
- Path getUserRootFolder() throws IOException;
-
- boolean isDir(String uri) throws IOException;
-
- boolean isRemote(String uri) throws IOException;
-
- boolean copyRemoteToLocal(String remoteUri, String localUri)
- throws IOException;
-
- boolean existsRemoteFile(String uri) throws IOException;
-
- FileStatus getRemoteFileStatus(String uri) throws IOException;
-
- long getRemoteFileSize(String uri) throws IOException;
-}
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/param/BaseParameters.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/param/BaseParameters.java
deleted file mode 100644
index 85c565e..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/param/BaseParameters.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-/**
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License. See accompanying LICENSE file.
- */
-
-package org.apache.submarine.commons.runtime.param;
-
-import org.apache.submarine.commons.runtime.ClientContext;
-import org.apache.submarine.commons.runtime.conf.SubmarineLogs;
-import org.apache.commons.cli.ParseException;
-import org.apache.hadoop.yarn.exceptions.YarnException;
-
-import java.io.IOException;
-
-/**
- * Base class of all parameters.
- */
-public abstract class BaseParameters {
- private String name;
-
- public void updateParameters(Parameter parameter, ClientContext clientContext)
- throws ParseException, IOException, YarnException {
- String name = parameter.getOptionValue("name");
- if (name == null) {
- throw new ParseException("--name is absent");
- }
-
- if (parameter.hasOption("verbose")) {
- SubmarineLogs.verboseOn();
- }
-
- this.setName(name);
- }
-
- public String getName() {
- return name;
- }
-
- public BaseParameters setName(String name) {
- this.name = name;
- return this;
- }
-}
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/param/Parameter.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/param/Parameter.java
deleted file mode 100644
index 95fffe1..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/param/Parameter.java
+++ /dev/null
@@ -1,45 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime.param;
-
-import org.apache.hadoop.yarn.exceptions.YarnException;
-import org.apache.submarine.commons.runtime.Framework;
-
-import java.util.List;
-
-public interface Parameter {
- /**
- * Get the ML framework
- * @return Framework
- */
- Framework getFramework();
-
- Parameter setFramework(Framework framework);
-
- BaseParameters getParameters();
-
- Parameter setParameters(BaseParameters parameters);
-
- String getOptionValue(String option) throws YarnException;
-
- boolean hasOption(String option);
-
- List<String> getOptionValues(String option) throws YarnException;
-}
diff --git a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/resource/ResourceUtils.java b/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/resource/ResourceUtils.java
deleted file mode 100644
index 3abb261..0000000
--- a/submarine-commons/commons-runtime/src/main/java/org/apache/submarine/commons/runtime/resource/ResourceUtils.java
+++ /dev/null
@@ -1,360 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime.resource;
-
-import org.apache.submarine.commons.runtime.exception.SubmarineRuntimeException;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-/**
- * This class implements some methods with the almost the same logic as
- * org.apache.hadoop.yarn.util.resource.ResourceUtils of hadoop 3.3.
- * If the hadoop dependencies are upgraded to 3.3, this class can be refactored
- * with org.apache.hadoop.yarn.util.resource.ResourceUtils.
- */
-public final class ResourceUtils {
-
- private static final String RES_PATTERN = "^[^=]+=\\d+\\s?\\w*$";
- private static final String SET_RESOURCE_VALUE_METHOD = "setResourceValue";
- private static final String SET_MEMORY_SIZE_METHOD = "setMemorySize";
- private static final String DEPRECATED_SET_MEMORY_SIZE_METHOD =
- "setMemory";
- private static final String GET_MEMORY_SIZE_METHOD = "getMemorySize";
- private static final String DEPRECATED_GET_MEMORY_SIZE_METHOD =
- "getMemory";
- private static final String GET_RESOURCE_VALUE_METHOD = "getResourceValue";
- private static final String GET_RESOURCE_TYPE_METHOD =
- "getResourcesTypeInfo";
- private static final String GET_RESOURCES = "getResources";
- private static final String REINITIALIZE_RESOURCES_METHOD =
- "reinitializeResources";
- public static final String MEMORY_URI = "memory-mb";
- public static final String VCORES_URI = "vcores";
- public static final String GPU_URI = "yarn.io/gpu";
- public static final String FPGA_URI = "yarn.io/fpga";
-
- private static final Logger LOG =
- LoggerFactory.getLogger(ResourceUtils.class);
-
- private ResourceUtils() {
- }
-
- public static Resource createResourceFromString(String resourceStr) {
- Map<String, Long> typeToValue = parseResourcesString(resourceStr);
- return createResource(typeToValue);
- }
-
- public static Resource createResource(Map<String, Long> typeToValue) {
- Resource resource = Resource.newInstance(0, 0);
- for (Map.Entry<String, Long> entry : typeToValue.entrySet()) {
- if (entry.getKey().equalsIgnoreCase(VCORES_URI)) {
- resource.setVirtualCores(entry.getValue().intValue());
- continue;
- } else if (entry.getKey().equalsIgnoreCase(MEMORY_URI)) {
- setMemorySize(resource, entry.getValue());
- continue;
- }
- setResource(resource, entry.getKey(), entry.getValue().intValue());
- }
- return resource;
- }
-
- public static Map<String, Long> getResourceMap(Resource resource) {
- Map<String, Long> resourceMap;
- if (resource == null) {
- resourceMap = new HashMap<>();
- } else {
- String resourceValue = resource.toString();
- // Delete <> in the resourceValue and replace ":" with "="
- resourceMap =
- parseResourcesString(
- resourceValue.substring(1, resourceValue.length() - 1)
- .replaceAll(":", "=")
- .replaceAll("memory", "memory-mb"));
- }
- return resourceMap;
- }
-
- private static Map<String, Long> parseResourcesString(String resourcesStr) {
- Map<String, Long> resources = new HashMap<>();
- String[] pairs = resourcesStr.trim().split(",");
- for (String resource : pairs) {
- resource = resource.trim();
- if (!resource.matches(RES_PATTERN)) {
- throw new IllegalArgumentException("\"" + resource + "\" is not a "
- + "valid resource type/amount pair. "
- + "Please provide key=amount pairs separated by commas.");
- }
- String[] splits = resource.split("=");
- String key = splits[0], value = splits[1];
- String units = getUnits(value);
-
- String valueWithoutUnit = value.substring(0,
- value.length() - units.length()).trim();
- long resourceValue = Long.parseLong(valueWithoutUnit);
-
- // Convert commandline unit to standard YARN unit.
- if (units.equals("M") || units.equals("m")) {
- units = "Mi";
- } else if (units.equals("G") || units.equals("g")) {
- units = "Gi";
- } else if (!units.isEmpty()) {
- throw new IllegalArgumentException("Acceptable units are M/G or empty");
- }
-
- // special handle memory-mb and memory
- if (key.equals(MEMORY_URI)) {
- if (!units.isEmpty()) {
- resourceValue = UnitsConversionUtil.convert(units, "Mi",
- resourceValue);
- }
- }
-
- if (key.equals("memory")) {
- key = MEMORY_URI;
- resourceValue = UnitsConversionUtil.convert(units, "Mi",
- resourceValue);
- }
-
- // special handle gpu
- if (key.equals("gpu")) {
- key = GPU_URI;
- }
-
- // special handle fpga
- if (key.equals("fpga")) {
- key = FPGA_URI;
- }
-
- resources.put(key, resourceValue);
- }
- return resources;
- }
-
- /**
- * As hadoop 2.9.2 and lower don't support resources except cpu and memory.
- * Use reflection to set GPU or other resources for compatibility with
- * hadoop 2.9.2
- */
- public static void setResource(Resource resource, String resourceName,
- int resourceValue) {
- try {
- Method method = resource.getClass().getMethod(SET_RESOURCE_VALUE_METHOD,
- String.class, long.class);
- method.invoke(resource, resourceName, resourceValue);
- } catch (NoSuchMethodException e) {
- LOG.error("There is no '" + SET_RESOURCE_VALUE_METHOD + "' API in this" +
- "version of YARN", e);
- throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
- } catch (IllegalAccessException | InvocationTargetException e) {
- LOG.error("Failed to invoke '" + SET_RESOURCE_VALUE_METHOD +
- "' method to set GPU resources", e);
- throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
- }
- return;
- }
-
- public static void setMemorySize(Resource resource, Long memorySize) {
- boolean useWithIntParameter = false;
- // For hadoop 2.9.2 and above
- try {
- Method method = resource.getClass().getMethod(SET_MEMORY_SIZE_METHOD,
- long.class);
- method.setAccessible(true);
- method.invoke(resource, memorySize);
- } catch (NoSuchMethodException nsme) {
- LOG.info("There is no '" + SET_MEMORY_SIZE_METHOD + "(long)' API in" +
- " this version of YARN");
- useWithIntParameter = true;
- } catch (IllegalAccessException | InvocationTargetException e) {
- LOG.error("Failed to invoke '" + SET_MEMORY_SIZE_METHOD +
- "' method", e);
- throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
- }
- // For hadoop 2.7.3
- if (useWithIntParameter) {
- try {
- LOG.info("Trying to use '" + DEPRECATED_SET_MEMORY_SIZE_METHOD +
- "(int)' API for this version of YARN");
- Method method = resource.getClass().getMethod(
- DEPRECATED_SET_MEMORY_SIZE_METHOD, int.class);
- method.invoke(resource, memorySize.intValue());
- } catch (NoSuchMethodException e) {
- LOG.error("There is no '" + DEPRECATED_SET_MEMORY_SIZE_METHOD +
- "(int)' API in this version of YARN", e);
- throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
- } catch (IllegalAccessException | InvocationTargetException e) {
- LOG.error("Failed to invoke '" + DEPRECATED_SET_MEMORY_SIZE_METHOD +
- "' method", e);
- throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
- }
- }
- }
-
- public static long getMemorySize(Resource resource) {
- boolean useWithIntParameter = false;
- long memory = 0;
- // For hadoop 2.9.2 and above
- try {
- Method method = resource.getClass().getMethod(GET_MEMORY_SIZE_METHOD);
- method.setAccessible(true);
- memory = (long) method.invoke(resource);
- } catch (NoSuchMethodException e) {
- LOG.info("There is no '" + GET_MEMORY_SIZE_METHOD + "' API in" +
- " this version of YARN");
- useWithIntParameter = true;
- } catch (IllegalAccessException | InvocationTargetException e) {
- LOG.error("Failed to invoke '" + GET_MEMORY_SIZE_METHOD +
- "' method", e);
- throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
- }
- // For hadoop 2.7.3
- if (useWithIntParameter) {
- try {
- LOG.info("Trying to use '" + DEPRECATED_GET_MEMORY_SIZE_METHOD +
- "' API for this version of YARN");
- Method method = resource.getClass().getMethod(
- DEPRECATED_GET_MEMORY_SIZE_METHOD);
- method.setAccessible(true);
- memory = ((Integer) method.invoke(resource)).longValue();
- } catch (NoSuchMethodException e) {
- LOG.error("There is no '" + DEPRECATED_GET_MEMORY_SIZE_METHOD +
- "' API in this version of YARN", e);
- throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
- } catch (IllegalAccessException | InvocationTargetException e) {
- LOG.error("Failed to invoke '" + DEPRECATED_GET_MEMORY_SIZE_METHOD +
- "' method", e);
- throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
- }
- }
- return memory;
- }
-
- /**
- * As hadoop 2.9.2 and lower don't support resources except cpu and memory.
- * Use reflection to set GPU or other resources for compatibility with
- * hadoop 2.9.2
- */
- public static long getResourceValue(Resource resource, String resourceName) {
- long resourceValue = 0;
- try {
- Method method = resource.getClass().getMethod(GET_RESOURCE_VALUE_METHOD,
- String.class);
- Object value = method.invoke(resource, resourceName);
- resourceValue = (long) value;
- } catch (NoSuchMethodException e) {
- LOG.info("There is no '" + GET_RESOURCE_VALUE_METHOD + "' API in this" +
- " version of YARN");
- } catch (InvocationTargetException e) {
- if (e.getTargetException().getClass().getName().equals(
- "org.apache.hadoop.yarn.exceptions.ResourceNotFoundException")) {
- LOG.info("Not found resource " + resourceName);
- } else {
- LOG.info("Failed to invoke '" + GET_RESOURCE_VALUE_METHOD + "'" +
- " method to get resource " + resourceName);
- throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
- }
- } catch (IllegalAccessException | ClassCastException e) {
- LOG.error("Failed to invoke '" + GET_RESOURCE_VALUE_METHOD +
- "' method to get resource " + resourceName, e);
- throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
- }
- return resourceValue;
- }
-
- /**
- * As hadoop 2.9.2 and lower don't support resources except cpu and memory.
- * Use reflection to add GPU or other resources for compatibility with
- * hadoop 2.9.2
- */
- public static void configureResourceType(String resrouceName) {
- Class resourceTypeInfo;
- try {
- resourceTypeInfo = Class.forName(
- "org.apache.hadoop.yarn.api.records.ResourceTypeInfo");
- Class resourceUtils = Class.forName(
- "org.apache.hadoop.yarn.util.resource.ResourceUtils");
- Method method = resourceUtils.getMethod(GET_RESOURCE_TYPE_METHOD);
- Object resTypes = method.invoke(null);
-
- Method resourceTypeInstance = resourceTypeInfo.getMethod("newInstance",
- String.class, String.class);
- Object resourceType = resourceTypeInstance.invoke(null, resrouceName, "");
- ((ArrayList) resTypes).add(resourceType);
-
- Method reInitialMethod = resourceUtils.getMethod(
- REINITIALIZE_RESOURCES_METHOD, List.class);
- reInitialMethod.invoke(null, resTypes);
-
- } catch (ClassNotFoundException e) {
- LOG.info("There is no specified class API in this" +
- " version of YARN");
- LOG.info(e.getMessage());
- throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
- } catch (NoSuchMethodException nsme) {
- LOG.info("There is no '" + GET_RESOURCE_VALUE_METHOD + "' or '" +
- REINITIALIZE_RESOURCES_METHOD + "' API in this" +
- " version of YARN");
- } catch (IllegalAccessException | InvocationTargetException e) {
- LOG.info("Failed to invoke 'configureResourceType' method ", e);
- throw new SubmarineRuntimeException(e.getMessage(), e.getCause());
- }
- }
-
- private static String getUnits(String resourceValue) {
- return parseResourceValue(resourceValue)[0];
- }
-
- /**
- * Extract unit and actual value from resource value.
- * @param resourceValue Value of the resource
- * @return Array containing unit and value. [0]=unit, [1]=value
- * @throws IllegalArgumentException if units contain non alpha characters
- */
- private static String[] parseResourceValue(String resourceValue) {
- String[] resource = new String[2];
- int i = 0;
- for (; i < resourceValue.length(); i++) {
- if (Character.isAlphabetic(resourceValue.charAt(i))) {
- break;
- }
- }
- String units = resourceValue.substring(i);
-
- if (StringUtils.isAlpha(units) || units.equals("")) {
- resource[0] = units;
- resource[1] = resourceValue.substring(0, i);
- return resource;
- } else {
- throw new IllegalArgumentException("Units '" + units + "'"
- + " contains non alphabet characters, which is not allowed.");
- }
- }
-
-}
diff --git a/submarine-commons/commons-runtime/src/test/java/org/apache/submarine/commons/runtime/MockClientContext.java b/submarine-commons/commons-runtime/src/test/java/org/apache/submarine/commons/runtime/MockClientContext.java
deleted file mode 100644
index 4f55bf7..0000000
--- a/submarine-commons/commons-runtime/src/test/java/org/apache/submarine/commons/runtime/MockClientContext.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime;
-
-import org.apache.submarine.commons.runtime.fs.MockRemoteDirectoryManager;
-import org.apache.submarine.commons.runtime.fs.RemoteDirectoryManager;
-
-import java.io.IOException;
-
-public class MockClientContext extends ClientContext {
-
- private RemoteDirectoryManager remoteDirectoryMgr;
-
- public MockClientContext(String jobName) throws IOException {
- this.remoteDirectoryMgr = new MockRemoteDirectoryManager(jobName);
- }
-
- @Override
- public RemoteDirectoryManager getRemoteDirectoryManager() {
- return remoteDirectoryMgr;
- }
-
- public void setRemoteDirectoryMgr(
- RemoteDirectoryManager remoteDirectoryMgr) {
- this.remoteDirectoryMgr = remoteDirectoryMgr;
- }
-}
diff --git a/submarine-commons/commons-runtime/src/test/java/org/apache/submarine/commons/runtime/fs/MockRemoteDirectoryManager.java b/submarine-commons/commons-runtime/src/test/java/org/apache/submarine/commons/runtime/fs/MockRemoteDirectoryManager.java
deleted file mode 100644
index 13fa78e..0000000
--- a/submarine-commons/commons-runtime/src/test/java/org/apache/submarine/commons/runtime/fs/MockRemoteDirectoryManager.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.submarine.commons.runtime.fs;
-
-import org.apache.commons.io.FileUtils;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.util.Time;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.Objects;
-
-public class MockRemoteDirectoryManager implements RemoteDirectoryManager {
-
- private static final String FAILED_TO_CREATE_DIRS_FORMAT_STRING =
- "Failed to create directories under path: %s";
- private static final String JOB_NAME_MUST_NOT_BE_NULL =
- "Job name must not be null!";
- private static final File STAGING_AREA = new File("target/_staging_area_");
-
- private File jobsParentDir;
- private File jobDir;
- private File modelParentDir;
-
- public MockRemoteDirectoryManager(String jobName) throws IOException {
- Objects.requireNonNull(jobName, JOB_NAME_MUST_NOT_BE_NULL);
- this.cleanup();
- this.jobsParentDir = initializeJobParentDir();
- this.jobDir = initializeJobDir(jobName);
- this.modelParentDir = initializeModelParentDir();
- }
-
- private void cleanup() throws IOException {
- FileUtils.deleteDirectory(STAGING_AREA);
- }
-
- private File initializeJobParentDir() throws IOException {
- File dir = new File(STAGING_AREA, String.valueOf(Time.monotonicNow()));
- if (!dir.mkdirs()) {
- throw new IOException(
- String.format(FAILED_TO_CREATE_DIRS_FORMAT_STRING,
- dir.getAbsolutePath()));
- }
- return dir;
- }
-
- private File initializeJobDir(String jobName) throws IOException {
- Objects.requireNonNull(jobsParentDir, "Job parent dir must not be null!");
- File dir = new File(jobsParentDir.getAbsolutePath(), jobName);
-
- if (!dir.exists() && !dir.mkdirs()) {
- throw new IOException(
- String.format(FAILED_TO_CREATE_DIRS_FORMAT_STRING,
- dir.getAbsolutePath()));
- }
- return dir;
- }
-
- private File initializeModelParentDir() throws IOException {
- File dir = new File(
- "target/_models_" + System.currentTimeMillis());
- if (!dir.exists() && !dir.mkdirs()) {
- throw new IOException(
- String.format(FAILED_TO_CREATE_DIRS_FORMAT_STRING,
- dir.getAbsolutePath()));
- }
- return dir;
- }
-
- @Override
- public Path getJobStagingArea(String jobName, boolean create)
- throws IOException {
- Objects.requireNonNull(jobName, JOB_NAME_MUST_NOT_BE_NULL);
- Objects.requireNonNull(jobDir, JOB_NAME_MUST_NOT_BE_NULL);
- this.jobDir = initializeJobDir(jobName);
- if (create && !jobDir.exists()) {
- if (!jobDir.mkdirs()) {
- throw new IOException(
- String.format(FAILED_TO_CREATE_DIRS_FORMAT_STRING,
- jobDir.getAbsolutePath()));
- }
- }
- return new Path(jobDir.getAbsolutePath());
- }
-
- @Override
- public Path getJobCheckpointDir(String jobName, boolean create)
- throws IOException {
- return new Path("s3://generated_checkpoint_dir");
- }
-
- @Override
- public Path getModelDir(String modelName, boolean create)
- throws IOException {
- Objects.requireNonNull(modelParentDir, "Model parent dir must not be null!");
- File modelDir = new File(modelParentDir.getAbsolutePath(), modelName);
- if (create) {
- if (!modelDir.exists() && !modelDir.mkdirs()) {
- throw new IOException("Failed to mkdirs for "
- + modelDir.getAbsolutePath());
- }
- }
- return new Path(modelDir.getAbsolutePath());
- }
-
- @Override
- public FileSystem getDefaultFileSystem() throws IOException {
- return FileSystem.getLocal(new Configuration());
- }
-
- @Override
- public FileSystem getFileSystemByUri(String uri) throws IOException {
- return getDefaultFileSystem();
- }
-
- @Override
- public Path getUserRootFolder() throws IOException {
- return new Path("s3://generated_root_dir");
- }
-
- @Override
- public boolean isDir(String uri) throws IOException {
- return getDefaultFileSystem().getFileStatus(
- new Path(convertToStagingPath(uri))).isDirectory();
- }
-
- @Override
- public boolean isRemote(String uri) throws IOException {
- String scheme = new Path(uri).toUri().getScheme();
- if (null == scheme) {
- return false;
- }
- return !scheme.startsWith("file://");
- }
-
- private String convertToStagingPath(String uri) throws IOException {
- if (isRemote(uri)) {
- String dirName = new Path(uri).getName();
- return this.jobDir.getAbsolutePath()
- + "/" + dirName;
- }
- return uri;
- }
-
- /**
- * We use staging dir as mock HDFS dir.
- * */
- @Override
- public boolean copyRemoteToLocal(String remoteUri, String localUri)
- throws IOException {
- // mock the copy from HDFS into a local copy
- Path remoteToLocalDir = new Path(convertToStagingPath(remoteUri));
- File old = new File(convertToStagingPath(localUri));
- if (old.isDirectory() && old.exists()) {
- if (!FileUtil.fullyDelete(old)) {
- throw new IOException("Cannot delete temp dir:"
- + old.getAbsolutePath());
- }
- }
- return FileUtil.copy(getDefaultFileSystem(), remoteToLocalDir,
- new File(localUri), false,
- getDefaultFileSystem().getConf());
- }
-
- @Override
- public boolean existsRemoteFile(String uri) throws IOException {
- String dirName = new Path(uri).getName();
- String fakeLocalFilePath = this.jobDir.getAbsolutePath()
- + "/" + dirName;
- return new File(fakeLocalFilePath).exists();
- }
-
- @Override
- public FileStatus getRemoteFileStatus(String uri) throws IOException {
- return getDefaultFileSystem().getFileStatus(new Path(
- convertToStagingPath(uri)));
- }
-
- @Override
- public long getRemoteFileSize(String uri) throws IOException {
- // 5 byte for this file to test
- if (uri.equals("https://a/b/1.patch")) {
- return 5;
- }
- return 100 * 1024 * 1024;
- }
-
- public void setJobDir(File jobDir) {
- this.jobDir = jobDir;
- }
-}
diff --git a/submarine-dist/pom.xml b/submarine-dist/pom.xml
index c0fdb9e..19dbefc 100644
--- a/submarine-dist/pom.xml
+++ b/submarine-dist/pom.xml
@@ -33,16 +33,6 @@
<name>Submarine: Dist</name>
<packaging>pom</packaging>
- <dependencyManagement>
- <dependencies>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-yarn-services-core</artifactId>
- <version>${hadoop.version}</version>
- </dependency>
- </dependencies>
- </dependencyManagement>
-
<dependencies>
<dependency>
<groupId>org.apache.submarine</groupId>
@@ -104,7 +94,7 @@
<goal>single</goal>
</goals>
<configuration>
- <finalName>${project.artifactId}-${project.version}-${profile-id}</finalName>
+ <finalName>${project.artifactId}-${project.version}</finalName>
<appendAssemblyId>false</appendAssemblyId>
<attach>false</attach>
<descriptors>
diff --git a/submarine-test/test-e2e/pom.xml b/submarine-test/test-e2e/pom.xml
index 4d02c2c..3705fa8 100644
--- a/submarine-test/test-e2e/pom.xml
+++ b/submarine-test/test-e2e/pom.xml
@@ -181,7 +181,7 @@
</activation>
<properties>
<submarine.daemon.package.base>
- ../../submarine-dist/target/submarine-dist-${project.version}-hadoop-2.9/submarine-dist-${project.version}-hadoop-2.9/bin
+ ../../submarine-dist/target/submarine-dist-${project.version}/submarine-dist-${project.version}/bin
</submarine.daemon.package.base>
</properties>
</profile>
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@submarine.apache.org
For additional commands, e-mail: dev-help@submarine.apache.org